store/copr: balance region for batch cop task (#24521)

This commit is contained in:
xufei
2021-05-18 18:21:40 +08:00
committed by GitHub
parent 5fd17dd7f4
commit 66c8cd96b0
4 changed files with 321 additions and 52 deletions

View File

@ -16,6 +16,8 @@ package copr
import (
"context"
"io"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
@ -25,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
@ -40,8 +43,9 @@ import (
type batchCopTask struct {
storeAddr string
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext
copTasks []copTaskAndRPCContext
regionInfos []tikv.RegionInfo
}
type batchCopResponse struct {
@ -93,9 +97,152 @@ func (rs *batchCopResponse) RespTime() time.Duration {
return rs.respTime
}
type copTaskAndRPCContext struct {
task *copTask
ctx *tikv.RPCContext
// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store because some
// meta data(like the rpc context) in batchCopTask is related to it
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
if len(originalTasks) <= 1 {
return originalTasks
}
storeTaskMap := make(map[uint64]*batchCopTask)
storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0
for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
batchTask := &batchCopTask{
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []tikv.RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
}
for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
for index, ri := range task.regionInfos {
// for each region, figure out the valid store num
validStoreNum := 0
if index == 0 {
continue
}
if len(ri.AllStores) <= 1 {
validStoreNum = 1
} else {
for _, storeID := range ri.AllStores {
if _, ok := storeTaskMap[storeID]; ok {
validStoreNum++
}
}
}
if validStoreNum == 1 {
// if only one store is valid, just put it to storeTaskMap
storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri)
} else {
// if more than one store is valid, put the region
// to store candidate map
totalRegionCandidateNum += validStoreNum
totalRemainingRegionNum += 1
taskKey := ri.Region.String()
for _, storeID := range ri.AllStores {
if _, validStore := storeTaskMap[storeID]; !validStore {
continue
}
if _, ok := storeCandidateRegionMap[storeID]; !ok {
candidateMap := make(map[string]tikv.RegionInfo)
storeCandidateRegionMap[storeID] = candidateMap
}
if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion {
// duplicated region, should not happen, just give up balance
logutil.BgLogger().Warn("Meet duplicated region info during when trying to balance batch cop task, give up balancing")
return originalTasks
}
storeCandidateRegionMap[storeID][taskKey] = ri
}
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}
avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
return store
}
store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}
var ret []*batchCopTask
for _, task := range storeTaskMap {
ret = append(ret, task)
}
return ret
}
func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
@ -138,13 +285,15 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
// Then `splitRegion` will reloads these regions.
continue
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx})
batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
copTasks: []copTaskAndRPCContext{{task, rpcCtx}},
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
@ -159,9 +308,25 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
}
continue
}
for _, task := range storeTaskMap {
batchTasks = append(batchTasks, task)
}
if log.GetLevel() <= zap.DebugLevel {
msg := "Before region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}
batchTasks = balanceBatchCopTask(batchTasks)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}
if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasks takes too much time",
@ -311,8 +476,8 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *
// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) {
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *tikvstore.KeyRange) {
ranges = append(ranges, *ran)
})
}
@ -320,16 +485,16 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer,
}
func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks))
for _, task := range task.copTasks {
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}
@ -351,13 +516,14 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
})
req.StoreTp = tikvrpc.TiFlash
logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks)))
resp, retry, cancel, err := sender.sendStreamReqToAddr(bo, task.copTasks, req, tikv.ReadTimeoutUltraLong)
logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
}
if err != nil {
err = derr.ToTiDBErr(err)
return nil, errors.Trace(err)
}
defer cancel()

View File

@ -180,14 +180,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
var regionInfos []*coprocessor.RegionInfo
originalTask, ok := req.Meta.(*batchCopTask)
if ok {
for _, task := range originalTask.copTasks {
for _, ri := range originalTask.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}
}
@ -214,8 +214,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, tikv.ReadTimeoutMedium)
sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.SendReqToAddr(bo.TiKVBackoffer(), originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package copr
package tikv
import (
"context"
@ -19,45 +19,52 @@ import (
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/kvproto/pkg/metapb"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// RegionInfo contains region related information for batchCopTask
type RegionInfo struct {
Region RegionVerID
Meta *metapb.Region
Ranges *KeyRanges
AllStores []uint64
}
// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
type RegionBatchRequestSender struct {
*tikv.RegionRequestSender
*RegionRequestSender
}
// NewRegionBatchRequestSender creates a RegionBatchRequestSender object.
func NewRegionBatchRequestSender(cache *tikv.RegionCache, client tikv.Client) *RegionBatchRequestSender {
func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatchRequestSender {
return &RegionBatchRequestSender{
RegionRequestSender: tikv.NewRegionRequestSender(cache, client),
RegionRequestSender: NewRegionRequestSender(cache, client),
}
}
func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
// use the first ctx to send request, because every ctx has same address.
// SendReqToAddr send batch cop request
func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
cancel = func() {}
rpcCtx := ctxs[0].ctx
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, cancel, errors.Trace(e)
}
ctx := bo.GetCtx()
if rawHook := ctx.Value(tikv.RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*tikv.RPCCanceller).WithCancel(ctx)
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
}
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil {
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
}
if err != nil {
cancel()
ss.SetRPCError(err)
e := ss.onSendFail(bo, ctxs, err)
e := ss.onSendFailForBatchRegions(bo, rpcCtx, regionInfos, err)
if e != nil {
return nil, false, func() {}, errors.Trace(e)
}
@ -67,30 +74,25 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co
return
}
func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error {
func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *RPCContext, regionInfos []RegionInfo, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
return errors.Trace(err)
} else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 {
} else if atomic.LoadUint32(&ShuttingDown) > 0 {
return tikverr.ErrTiDBShuttingDown
}
for _, failedCtx := range ctxs {
ctx := failedCtx.ctx
if ctx.Meta != nil {
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
ss.GetRegionCache().OnSendFail(bo.TiKVBackoffer(), ctx, true, err)
}
}
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs))
err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
return errors.Trace(err)
}

View File

@ -112,6 +112,15 @@ func (r *RegionStore) accessStore(mode AccessMode, idx AccessIndex) (int, *Store
return sidx, r.stores[sidx]
}
func (r *RegionStore) getAccessIndex(mode AccessMode, store *Store) AccessIndex {
for index, sidx := range r.accessIndex[mode] {
if r.stores[sidx].storeID == store.storeID {
return AccessIndex(index)
}
}
return -1
}
func (r *RegionStore) accessStoreNum(mode AccessMode) int {
return len(r.accessIndex[mode])
}
@ -526,6 +535,40 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
}, nil
}
// GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 {
// set the cap to 2 because usually, TiFlash table will have 2 replicas
allStores := make([]uint64, 0, 2)
// make sure currentStore id is always the first in allStores
allStores = append(allStores, currentStore.storeID)
ts := time.Now().Unix()
cachedRegion := c.getCachedRegionWithRLock(id)
if cachedRegion == nil {
return allStores
}
if !cachedRegion.checkRegionCacheTTL(ts) {
return allStores
}
regionStore := cachedRegion.getStore()
currentIndex := regionStore.getAccessIndex(TiFlashOnly, currentStore)
if currentIndex == -1 {
return allStores
}
for startOffset := 1; startOffset < regionStore.accessStoreNum(TiFlashOnly); startOffset++ {
accessIdx := AccessIndex((int(currentIndex) + startOffset) % regionStore.accessStoreNum(TiFlashOnly))
storeIdx, store := regionStore.accessStore(TiFlashOnly, accessIdx)
if store.getResolveState() == needCheck {
continue
}
storeFailEpoch := atomic.LoadUint32(&store.epoch)
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
continue
}
allStores = append(allStores, store.storeID)
}
return allStores
}
// GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region
// must be out of date and already dropped from cache or not flash store found.
// `loadBalance` is an option. For MPP and batch cop, it is pointless and might cause try the failed store repeatly.
@ -668,6 +711,64 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool)
return r, nil
}
// OnSendFailForBatchRegions handles send request fail logic.
func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, regionInfos []RegionInfo, scheduleReload bool, err error) {
metrics.RegionCacheCounterWithSendFail.Add(float64(len(regionInfos)))
if store.storeType != tikvrpc.TiFlash {
logutil.Logger(bo.GetCtx()).Info("Should not reach here, OnSendFailForBatchRegions only support TiFlash")
return
}
for _, ri := range regionInfos {
if ri.Meta == nil {
continue
}
r := c.getCachedRegionWithRLock(ri.Region)
if r != nil {
peersNum := len(r.meta.Peers)
if len(ri.Meta.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed",
zap.Stringer("region", &ri.Region),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", ri.Meta.Peers),
zap.Reflect("newPeers", r.meta.Peers),
zap.Error(err))
continue
}
rs := r.getStore()
accessMode := TiFlashOnly
accessIdx := rs.getAccessIndex(accessMode, store)
if accessIdx == -1 {
logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + ri.Region.String())
continue
}
if err != nil {
storeIdx, s := rs.accessStore(accessMode, accessIdx)
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
}
// try next peer
rs.switchNextFlashPeer(r, accessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
zap.Stringer("region", &ri.Region),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
// force reload region when retry all known peers in region.
if scheduleReload {
r.scheduleReload()
}
}
}
}
// OnSendFail handles send request fail logic.
func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool, err error) {
metrics.RegionCacheCounterWithSendFail.Inc()