426 lines
11 KiB
Go
426 lines
11 KiB
Go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package checksum
|
|
|
|
import (
|
|
"context"
|
|
"slices"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/br/pkg/metautil"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/pkg/distsql"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/util/ranger"
|
|
"github.com/pingcap/tipb/go-tipb"
|
|
"github.com/tikv/client-go/v2/util"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// ExecutorBuilder is used to build a "kv.Request".
|
|
type ExecutorBuilder struct {
|
|
table *model.TableInfo
|
|
ts uint64
|
|
|
|
oldTable *metautil.Table
|
|
|
|
concurrency uint
|
|
backoffWeight int
|
|
|
|
oldKeyspace []byte
|
|
newKeyspace []byte
|
|
|
|
resourceGroupName string
|
|
requestSource util.RequestSource
|
|
}
|
|
|
|
// NewExecutorBuilder returns a new executor builder.
|
|
func NewExecutorBuilder(table *model.TableInfo, ts uint64) *ExecutorBuilder {
|
|
return &ExecutorBuilder{
|
|
table: table,
|
|
ts: ts,
|
|
|
|
concurrency: vardef.DefDistSQLScanConcurrency,
|
|
}
|
|
}
|
|
|
|
// SetOldTable set a old table info to the builder.
|
|
func (builder *ExecutorBuilder) SetOldTable(oldTable *metautil.Table) *ExecutorBuilder {
|
|
builder.oldTable = oldTable
|
|
return builder
|
|
}
|
|
|
|
// SetConcurrency set the concurrency of the checksum executing.
|
|
func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
|
|
builder.concurrency = conc
|
|
return builder
|
|
}
|
|
|
|
// SetBackoffWeight set the backoffWeight of the checksum executing.
|
|
func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder {
|
|
builder.backoffWeight = backoffWeight
|
|
return builder
|
|
}
|
|
|
|
func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
|
|
builder.oldKeyspace = keyspace
|
|
return builder
|
|
}
|
|
|
|
func (builder *ExecutorBuilder) SetNewKeyspace(keyspace []byte) *ExecutorBuilder {
|
|
builder.newKeyspace = keyspace
|
|
return builder
|
|
}
|
|
|
|
func (builder *ExecutorBuilder) SetResourceGroupName(name string) *ExecutorBuilder {
|
|
builder.resourceGroupName = name
|
|
return builder
|
|
}
|
|
|
|
func (builder *ExecutorBuilder) SetRequestSource(reqSource util.RequestSource) *ExecutorBuilder {
|
|
builder.requestSource = reqSource
|
|
return builder
|
|
}
|
|
|
|
func (builder *ExecutorBuilder) SetExplicitRequestSourceType(name string) *ExecutorBuilder {
|
|
builder.requestSource.ExplicitRequestSourceType = name
|
|
return builder
|
|
}
|
|
|
|
// Build builds a checksum executor.
|
|
func (builder *ExecutorBuilder) Build() (*Executor, error) {
|
|
reqs, err := buildChecksumRequest(
|
|
builder.table,
|
|
builder.oldTable,
|
|
builder.ts,
|
|
builder.concurrency,
|
|
builder.oldKeyspace,
|
|
builder.newKeyspace,
|
|
builder.resourceGroupName,
|
|
builder.requestSource,
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil
|
|
}
|
|
|
|
func buildChecksumRequest(
|
|
newTable *model.TableInfo,
|
|
oldTable *metautil.Table,
|
|
startTS uint64,
|
|
concurrency uint,
|
|
oldKeyspace []byte,
|
|
newKeyspace []byte,
|
|
resourceGroupName string,
|
|
requestSource util.RequestSource,
|
|
) ([]*kv.Request, error) {
|
|
var partDefs []model.PartitionDefinition
|
|
if part := newTable.Partition; part != nil {
|
|
partDefs = part.Definitions
|
|
}
|
|
|
|
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
|
|
var oldTableID int64
|
|
if oldTable != nil {
|
|
oldTableID = oldTable.Info.ID
|
|
}
|
|
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency,
|
|
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
reqs = append(reqs, rs...)
|
|
|
|
for _, partDef := range partDefs {
|
|
var oldPartID int64
|
|
if oldTable != nil {
|
|
oldPartID, err = utils.GetPartitionByName(oldTable.Info, partDef.Name)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency,
|
|
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
reqs = append(reqs, rs...)
|
|
}
|
|
|
|
return reqs, nil
|
|
}
|
|
|
|
func buildRequest(
|
|
tableInfo *model.TableInfo,
|
|
tableID int64,
|
|
oldTable *metautil.Table,
|
|
oldTableID int64,
|
|
startTS uint64,
|
|
concurrency uint,
|
|
oldKeyspace []byte,
|
|
newKeyspace []byte,
|
|
resourceGroupName string,
|
|
requestSource util.RequestSource,
|
|
) ([]*kv.Request, error) {
|
|
reqs := make([]*kv.Request, 0)
|
|
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency,
|
|
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
reqs = append(reqs, req)
|
|
|
|
for _, indexInfo := range tableInfo.Indices {
|
|
if indexInfo.State != model.StatePublic {
|
|
continue
|
|
}
|
|
var oldIndexInfo *model.IndexInfo
|
|
if oldTable != nil {
|
|
for _, oldIndex := range oldTable.Info.Indices {
|
|
if oldIndex.Name == indexInfo.Name {
|
|
oldIndexInfo = oldIndex
|
|
break
|
|
}
|
|
}
|
|
if oldIndexInfo == nil {
|
|
log.Panic("index not found in origin table, "+
|
|
"please check the restore table has the same index info with origin table",
|
|
zap.Int64("table id", tableID),
|
|
zap.Stringer("table name", tableInfo.Name),
|
|
zap.Int64("origin table id", oldTableID),
|
|
zap.Stringer("origin table name", oldTable.Info.Name),
|
|
zap.Stringer("index name", indexInfo.Name))
|
|
}
|
|
}
|
|
req, err = buildIndexRequest(
|
|
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency,
|
|
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
reqs = append(reqs, req)
|
|
}
|
|
|
|
return reqs, nil
|
|
}
|
|
|
|
func buildTableRequest(
|
|
tableInfo *model.TableInfo,
|
|
tableID int64,
|
|
oldTable *metautil.Table,
|
|
oldTableID int64,
|
|
startTS uint64,
|
|
concurrency uint,
|
|
oldKeyspace []byte,
|
|
newKeyspace []byte,
|
|
resourceGroupName string,
|
|
requestSource util.RequestSource,
|
|
) (*kv.Request, error) {
|
|
var rule *tipb.ChecksumRewriteRule
|
|
if oldTable != nil {
|
|
rule = &tipb.ChecksumRewriteRule{
|
|
OldPrefix: slices.Concat(oldKeyspace, tablecodec.GenTableRecordPrefix(oldTableID)),
|
|
NewPrefix: slices.Concat(newKeyspace, tablecodec.GenTableRecordPrefix(tableID)),
|
|
}
|
|
}
|
|
|
|
checksum := &tipb.ChecksumRequest{
|
|
ScanOn: tipb.ChecksumScanOn_Table,
|
|
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
|
|
Rule: rule,
|
|
}
|
|
|
|
var ranges []*ranger.Range
|
|
if tableInfo.IsCommonHandle {
|
|
ranges = ranger.FullNotNullRange()
|
|
} else {
|
|
ranges = ranger.FullIntRange(false)
|
|
}
|
|
|
|
var builder distsql.RequestBuilder
|
|
// Use low priority to reducing impact to other requests.
|
|
builder.Request.Priority = kv.PriorityLow
|
|
return builder.SetHandleRanges(nil, tableID, tableInfo.IsCommonHandle, ranges).
|
|
SetStartTS(startTS).
|
|
SetChecksumRequest(checksum).
|
|
SetConcurrency(int(concurrency)).
|
|
SetResourceGroupName(resourceGroupName).
|
|
SetRequestSource(requestSource).
|
|
Build()
|
|
}
|
|
|
|
func buildIndexRequest(
|
|
tableID int64,
|
|
indexInfo *model.IndexInfo,
|
|
oldTableID int64,
|
|
oldIndexInfo *model.IndexInfo,
|
|
startTS uint64,
|
|
concurrency uint,
|
|
oldKeyspace []byte,
|
|
newKeyspace []byte,
|
|
resourceGroupName string,
|
|
requestSource util.RequestSource,
|
|
) (*kv.Request, error) {
|
|
var rule *tipb.ChecksumRewriteRule
|
|
if oldIndexInfo != nil {
|
|
rule = &tipb.ChecksumRewriteRule{
|
|
OldPrefix: slices.Concat(oldKeyspace, tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)),
|
|
NewPrefix: slices.Concat(newKeyspace, tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)),
|
|
}
|
|
}
|
|
checksum := &tipb.ChecksumRequest{
|
|
ScanOn: tipb.ChecksumScanOn_Index,
|
|
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
|
|
Rule: rule,
|
|
}
|
|
|
|
ranges := ranger.FullRange()
|
|
|
|
var builder distsql.RequestBuilder
|
|
// Use low priority to reducing impact to other requests.
|
|
builder.Request.Priority = kv.PriorityLow
|
|
return builder.SetIndexRanges(nil, tableID, indexInfo.ID, ranges).
|
|
SetStartTS(startTS).
|
|
SetChecksumRequest(checksum).
|
|
SetConcurrency(int(concurrency)).
|
|
SetResourceGroupName(resourceGroupName).
|
|
SetRequestSource(requestSource).
|
|
Build()
|
|
}
|
|
|
|
func sendChecksumRequest(
|
|
ctx context.Context, client kv.Client, req *kv.Request, vars *kv.Variables,
|
|
) (resp *tipb.ChecksumResponse, err error) {
|
|
res, err := distsql.Checksum(ctx, client, req, vars)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
defer func() {
|
|
if err1 := res.Close(); err1 != nil {
|
|
err = err1
|
|
}
|
|
}()
|
|
|
|
resp = &tipb.ChecksumResponse{}
|
|
|
|
for {
|
|
data, err := res.NextRaw(ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if data == nil {
|
|
break
|
|
}
|
|
checksum := &tipb.ChecksumResponse{}
|
|
if err = checksum.Unmarshal(data); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
updateChecksumResponse(resp, checksum)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func updateChecksumResponse(resp, update *tipb.ChecksumResponse) {
|
|
resp.Checksum ^= update.Checksum
|
|
resp.TotalKvs += update.TotalKvs
|
|
resp.TotalBytes += update.TotalBytes
|
|
}
|
|
|
|
// Executor is a checksum executor.
|
|
type Executor struct {
|
|
reqs []*kv.Request
|
|
backoffWeight int
|
|
}
|
|
|
|
// Len returns the total number of checksum requests.
|
|
func (exec *Executor) Len() int {
|
|
return len(exec.reqs)
|
|
}
|
|
|
|
// Each executes the function to each requests in the executor.
|
|
func (exec *Executor) Each(f func(*kv.Request) error) error {
|
|
for _, req := range exec.reqs {
|
|
err := f(req)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RawRequests extracts the raw requests associated with this executor.
|
|
// This is mainly used for debugging only.
|
|
func (exec *Executor) RawRequests() ([]*tipb.ChecksumRequest, error) {
|
|
res := make([]*tipb.ChecksumRequest, 0, len(exec.reqs))
|
|
for _, req := range exec.reqs {
|
|
rawReq := new(tipb.ChecksumRequest)
|
|
if err := proto.Unmarshal(req.Data, rawReq); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
res = append(res, rawReq)
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// Execute executes a checksum executor.
|
|
func (exec *Executor) Execute(
|
|
ctx context.Context,
|
|
client kv.Client,
|
|
updateFn func(),
|
|
) (*tipb.ChecksumResponse, error) {
|
|
checksumResp := &tipb.ChecksumResponse{}
|
|
for _, req := range exec.reqs {
|
|
// Pointer to SessionVars.Killed
|
|
// Killed is a flag to indicate that this query is killed.
|
|
//
|
|
// It is useful in TiDB, however, it's a place holder in BR.
|
|
killed := uint32(0)
|
|
var (
|
|
resp *tipb.ChecksumResponse
|
|
err error
|
|
)
|
|
err = utils.WithRetry(ctx, func() error {
|
|
vars := kv.NewVariables(&killed)
|
|
if exec.backoffWeight > 0 {
|
|
vars.BackOffWeight = exec.backoffWeight
|
|
}
|
|
resp, err = sendChecksumRequest(ctx, client, req, vars)
|
|
failpoint.Inject("checksumRetryErr", func(val failpoint.Value) {
|
|
// first time reach here. return error
|
|
if val.(bool) {
|
|
err = errors.New("inject checksum error")
|
|
}
|
|
})
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return nil
|
|
}, utils.NewChecksumBackoffStrategy())
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
updateChecksumResponse(checksumResp, resp)
|
|
updateFn()
|
|
}
|
|
return checksumResp, checkContextDone(ctx)
|
|
}
|
|
|
|
// The coprocessor won't return the error if the context is done,
|
|
// so sometimes BR would get the incomplete result.
|
|
// checkContextDone makes sure the result is not affected by CONTEXT DONE.
|
|
func checkContextDone(ctx context.Context) error {
|
|
ctxErr := ctx.Err()
|
|
if ctxErr != nil {
|
|
return errors.Annotate(ctxErr, "context is cancelled by other error")
|
|
}
|
|
return nil
|
|
}
|