lightning: continue this region round and retry on next round when TiKV is busy (#40278)

* finish

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix some comments

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix CI

Signed-off-by: lance6716 <lance6716@gmail.com>

* address comment

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix errdoc

Signed-off-by: lance6716 <lance6716@gmail.com>

* revert a change

Signed-off-by: lance6716 <lance6716@gmail.com>

* add lightning debug log

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix bug

Signed-off-by: lance6716 <lance6716@gmail.com>

* address comment

Signed-off-by: lance6716 <lance6716@gmail.com>

Signed-off-by: lance6716 <lance6716@gmail.com>
Co-authored-by: Weizhen Wang <wangweizhen@pingcap.com>
This commit is contained in:
lance6716
2023-01-12 15:24:33 +08:00
committed by GitHub
parent 22b43ff396
commit aef752a407
4 changed files with 150 additions and 112 deletions

View File

@ -984,6 +984,32 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger)
}
func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
opt := &pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
}
iter := e.newKVIter(context.Background(), opt)
//nolint: errcheck
defer iter.Close()
// Needs seek to first because NewIter returns an iterator that is unpositioned
hasKey := iter.First()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to read the first key")
}
if !hasKey {
return nil, nil, nil
}
firstKey := append([]byte{}, iter.Key()...)
iter.Last()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to seek to the last key")
}
lastKey := append([]byte{}, iter.Key()...)
return firstKey, lastKey, nil
}
type sstMeta struct {
path string
minKey []byte

View File

@ -89,3 +89,37 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
},
}), errorEngineClosed)
}
func TestGetFirstAndLastKey(t *testing.T) {
db, tmpPath := makePebbleDB(t, nil)
f := &Engine{
db: db,
sstDir: tmpPath,
}
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = db.Set([]byte("c"), []byte("c"), nil)
require.NoError(t, err)
err = db.Set([]byte("e"), []byte("e"), nil)
require.NoError(t, err)
first, last, err := f.getFirstAndLastKey(nil, nil)
require.NoError(t, err)
require.Equal(t, []byte("a"), first)
require.Equal(t, []byte("e"), last)
first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("d"))
require.NoError(t, err)
require.Equal(t, []byte("c"), first)
require.Equal(t, []byte("c"), last)
first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("f"))
require.NoError(t, err)
require.Equal(t, []byte("c"), first)
require.Equal(t, []byte("e"), last)
first, last, err = f.getFirstAndLastKey([]byte("y"), []byte("z"))
require.NoError(t, err)
require.Nil(t, first)
require.Nil(t, last)
}

View File

@ -898,9 +898,15 @@ type rangeStats struct {
totalBytes int64
}
type tikvWriteResult struct {
sstMeta []*sst.SSTMeta
finishedRange Range
rangeStats rangeStats
}
// WriteToTiKV writer engine key-value pairs to tikv and return the sst meta generated by tikv.
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
// tikv will takes the responsibility to do so.
// tikv will take the responsibility to do so.
func (local *local) WriteToTiKV(
ctx context.Context,
engine *Engine,
@ -908,9 +914,9 @@ func (local *local) WriteToTiKV(
start, end []byte,
regionSplitSize int64,
regionSplitKeys int64,
) ([]*sst.SSTMeta, Range, rangeStats, error) {
) (*tikvWriteResult, error) {
failpoint.Inject("WriteToTiKVNotEnoughDiskSpace", func(_ failpoint.Value) {
failpoint.Return(nil, Range{}, rangeStats{},
failpoint.Return(nil,
errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", "", 0, 0))
})
if local.checkTiKVAvaliable {
@ -926,7 +932,7 @@ func (local *local) WriteToTiKV(
// The available disk percent of TiKV
ratio := store.Status.Available * 100 / store.Status.Capacity
if ratio < 10 {
return nil, Range{}, rangeStats{}, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d",
return nil, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d",
store.Store.Address, store.Status.Available, store.Status.Capacity)
}
}
@ -939,29 +945,20 @@ func (local *local) WriteToTiKV(
}
begin := time.Now()
regionRange := intersectRange(region.Region, Range{start: start, end: end})
opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end}
iter := engine.newKVIter(ctx, opt)
//nolint: errcheck
defer iter.Close()
stats := rangeStats{}
iter.First()
if iter.Error() != nil {
return nil, Range{}, stats, errors.Annotate(iter.Error(), "failed to read the first key")
firstKey, lastKey, err := engine.getFirstAndLastKey(regionRange.start, regionRange.end)
if err != nil {
return nil, errors.Trace(err)
}
if !iter.Valid() {
if firstKey == nil {
log.FromContext(ctx).Info("keys within region is empty, skip ingest", logutil.Key("start", start),
logutil.Key("regionStart", region.Region.StartKey), logutil.Key("end", end),
logutil.Key("regionEnd", region.Region.EndKey))
return nil, regionRange, stats, nil
return &tikvWriteResult{sstMeta: nil, finishedRange: regionRange, rangeStats: stats}, nil
}
firstKey := codec.EncodeBytes([]byte{}, iter.Key())
iter.Last()
if iter.Error() != nil {
return nil, Range{}, stats, errors.Annotate(iter.Error(), "failed to seek to the last key")
}
lastKey := codec.EncodeBytes([]byte{}, iter.Key())
firstKey = codec.EncodeBytes([]byte{}, firstKey)
lastKey = codec.EncodeBytes([]byte{}, lastKey)
u := uuid.New()
meta := &sst.SSTMeta{
@ -981,12 +978,12 @@ func (local *local) WriteToTiKV(
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return nil, Range{}, stats, err
return nil, errors.Trace(err)
}
wstream, err := cli.Write(ctx)
if err != nil {
return nil, Range{}, stats, errors.Trace(err)
return nil, errors.Trace(err)
}
// Bind uuid for this write request
@ -996,7 +993,7 @@ func (local *local) WriteToTiKV(
},
}
if err = wstream.Send(req); err != nil {
return nil, Range{}, stats, errors.Trace(err)
return nil, errors.Trace(err)
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
@ -1037,6 +1034,11 @@ func (local *local) WriteToTiKV(
return nil
}
opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end}
iter := engine.newKVIter(ctx, opt)
//nolint: errcheck
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
kvSize := int64(len(iter.Key()) + len(iter.Value()))
// here we reuse the `*sst.Pair`s to optimize object allocation
@ -1057,7 +1059,7 @@ func (local *local) WriteToTiKV(
if count >= local.batchWriteKVPairs || size >= flushLimit {
if err := flushKVs(); err != nil {
return nil, Range{}, stats, err
return nil, errors.Trace(err)
}
count = 0
size = 0
@ -1069,12 +1071,12 @@ func (local *local) WriteToTiKV(
}
if iter.Error() != nil {
return nil, Range{}, stats, errors.Trace(iter.Error())
return nil, errors.Trace(iter.Error())
}
if count > 0 {
if err := flushKVs(); err != nil {
return nil, Range{}, stats, err
return nil, errors.Trace(err)
}
count = 0
size = 0
@ -1085,10 +1087,10 @@ func (local *local) WriteToTiKV(
for i, wStream := range clients {
resp, closeErr := wStream.CloseAndRecv()
if closeErr != nil {
return nil, Range{}, stats, errors.Trace(closeErr)
return nil, errors.Trace(closeErr)
}
if resp.Error != nil {
return nil, Range{}, stats, errors.New(resp.Error.Message)
return nil, errors.New(resp.Error.Message)
}
if leaderID == region.Region.Peers[i].GetId() {
leaderPeerMetas = resp.Metas
@ -1101,7 +1103,7 @@ func (local *local) WriteToTiKV(
log.FromContext(ctx).Warn("write to tikv no leader", logutil.Region(region.Region), logutil.Leader(region.Leader),
zap.Uint64("leader_id", leaderID), logutil.SSTMeta(meta),
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size))
return nil, Range{}, stats, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
return nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
region.Region.Id, leaderID)
}
@ -1123,7 +1125,11 @@ func (local *local) WriteToTiKV(
stats.count = totalCount
stats.totalBytes = totalSize
return leaderPeerMetas, finishedRange, stats, nil
return &tikvWriteResult{
sstMeta: leaderPeerMetas,
finishedRange: finishedRange,
rangeStats: stats,
}, nil
}
func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
@ -1155,21 +1161,12 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
}
if local.shouldCheckWriteStall {
for {
maybeWriteStall, err := local.checkWriteStall(ctx, region)
if err != nil {
return nil, err
}
if !maybeWriteStall {
break
}
log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry",
zap.Duration("duration", writeStallSleepTime))
select {
case <-time.After(writeStallSleepTime):
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
}
writeStall, resp, err := local.checkWriteStall(ctx, region)
if err != nil {
return nil, errors.Trace(err)
}
if writeStall {
return resp, nil
}
}
@ -1181,21 +1178,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
return resp, errors.Trace(err)
}
func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) {
func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, *sst.IngestResponse, error) {
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return false, errors.Trace(err)
return false, nil, errors.Trace(err)
}
// currently we use empty MultiIngestRequest to check if TiKV is busy.
// If in future the rate limit feature contains more metrics we can switch to use it.
resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
return false, errors.Trace(err)
return false, nil, errors.Trace(err)
}
if resp.Error != nil && resp.Error.ServerIsBusy != nil {
return true, nil
return true, resp, nil
}
}
return false, nil
return false, nil, nil
}
func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
@ -1234,29 +1233,14 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit
}
func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, regionSplitSize int64, regionSplitKeys int64) ([]Range, error) {
iter := engine.newKVIter(ctx, &pebble.IterOptions{})
//nolint: errcheck
defer iter.Close()
iterError := func(e string) error {
err := iter.Error()
if err != nil {
return errors.Annotate(err, e)
}
return errors.New(e)
firstKey, lastKey, err := engine.getFirstAndLastKey(nil, nil)
if err != nil {
return nil, err
}
if firstKey == nil {
return nil, errors.New("could not find first pair")
}
var firstKey, lastKey []byte
if iter.First() {
firstKey = append([]byte{}, iter.Key()...)
} else {
return nil, iterError("could not find first pair")
}
if iter.Last() {
lastKey = append([]byte{}, iter.Key()...)
} else {
return nil, iterError("could not find last pair")
}
endKey := nextKey(lastKey)
engineFileTotalSize := engine.TotalSize.Load()
@ -1286,45 +1270,27 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, r
}
func (local *local) writeAndIngestByRange(
ctxt context.Context,
ctx context.Context,
engine *Engine,
start, end []byte,
regionSplitSize int64,
regionSplitKeys int64,
) error {
ito := &pebble.IterOptions{
LowerBound: start,
UpperBound: end,
pairStart, pairEnd, err := engine.getFirstAndLastKey(start, end)
if err != nil {
return err
}
iter := engine.newKVIter(ctxt, ito)
//nolint: errcheck
defer iter.Close()
// Needs seek to first because NewIter returns an iterator that is unpositioned
hasKey := iter.First()
if iter.Error() != nil {
return errors.Annotate(iter.Error(), "failed to read the first key")
}
if !hasKey {
log.FromContext(ctxt).Info("There is no pairs in iterator",
if pairStart == nil {
log.FromContext(ctx).Info("There is no pairs in iterator",
logutil.Key("start", start),
logutil.Key("end", end))
engine.finishedRanges.add(Range{start: start, end: end})
return nil
}
pairStart := append([]byte{}, iter.Key()...)
iter.Last()
if iter.Error() != nil {
return errors.Annotate(iter.Error(), "failed to seek to the last key")
}
pairEnd := append([]byte{}, iter.Key()...)
var regions []*split.RegionInfo
var err error
ctx, cancel := context.WithCancel(ctxt)
defer cancel()
WriteAndIngest:
ScanWriteIngest:
for retry := 0; retry < maxRetryTimes; {
if retry != 0 {
select {
@ -1340,7 +1306,7 @@ WriteAndIngest:
log.FromContext(ctx).Warn("scan region failed", log.ShortError(err), zap.Int("region_len", len(regions)),
logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.Int("retry", retry))
retry++
continue WriteAndIngest
continue ScanWriteIngest
}
for _, region := range regions {
@ -1365,7 +1331,7 @@ WriteAndIngest:
}
log.FromContext(ctx).Info("retry write and ingest kv pairs", logutil.Key("startKey", pairStart),
logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry))
continue WriteAndIngest
continue ScanWriteIngest
}
}
@ -1381,6 +1347,7 @@ const (
retryNone retryType = iota
retryWrite
retryIngest
retryBusyIngest
)
func (local *local) isRetryableImportTiKVError(err error) bool {
@ -1396,6 +1363,11 @@ func (local *local) isRetryableImportTiKVError(err error) bool {
return common.IsRetryableError(err)
}
// writeAndIngestPairs writes the kv pairs in the range [start, end) to the peers
// of the region, and then send the ingest command to do RocksDB ingest.
// when return nil, it does not mean the whole task success. The success ranges is
// recorded in the engine.finishedRanges.
// TODO: regionSplitSize and regionSplitKeys can be a member of Engine, no need to pass it in every function.
func (local *local) writeAndIngestPairs(
ctx context.Context,
engine *Engine,
@ -1405,13 +1377,10 @@ func (local *local) writeAndIngestPairs(
regionSplitKeys int64,
) error {
var err error
var writeResult *tikvWriteResult
loopWrite:
for i := 0; i < maxRetryTimes; i++ {
var metas []*sst.SSTMeta
var finishedRange Range
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
writeResult, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if !local.isRetryableImportTiKVError(err) {
return err
@ -1420,6 +1389,7 @@ loopWrite:
log.FromContext(ctx).Warn("write to tikv failed", log.ShortError(err), zap.Int("retry", i))
continue loopWrite
}
metas, finishedRange, rangeStats := writeResult.sstMeta, writeResult.finishedRange, writeResult.rangeStats
if len(metas) == 0 {
return nil
@ -1486,6 +1456,7 @@ loopWrite:
// ingest next meta
break
}
switch retryTy {
case retryNone:
log.FromContext(ctx).Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(ingestMetas),
@ -1498,25 +1469,30 @@ loopWrite:
case retryIngest:
region = newRegion
continue
case retryBusyIngest:
log.FromContext(ctx).Warn("meet tikv busy when ingest", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region))
// ImportEngine will continue on this unfinished range
return nil
}
}
}
if err != nil {
log.FromContext(ctx).Warn("write and ingest region, will retry import full range", log.ShortError(err),
logutil.Region(region.Region), logutil.Key("start", start),
logutil.Key("end", end))
} else {
if err == nil {
engine.importedKVSize.Add(rangeStats.totalBytes)
engine.importedKVCount.Add(rangeStats.count)
engine.finishedRanges.add(finishedRange)
if local.metrics != nil {
local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
}
return nil
}
log.FromContext(ctx).Warn("write and ingest region, will retry import full range", log.ShortError(err),
logutil.Region(region.Region), logutil.Key("start", start),
logutil.Key("end", end))
return errors.Trace(err)
}
return errors.Trace(err)
}
@ -2015,7 +1991,7 @@ func (local *local) isIngestRetryable(
}
return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage())
case errPb.ServerIsBusy != nil:
return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
return retryBusyIngest, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
case errPb.RegionNotFound != nil:
return retryNone, nil, common.ErrKVRegionNotFound.GenWithStack(errPb.GetMessage())
case errPb.ReadIndexNotReady != nil:

View File

@ -1313,6 +1313,8 @@ func TestCheckPeersBusy(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []uint64{11, 12, 13, 21, 22, 23}, apiInvokeRecorder["Write"])
// store 12 has a follower busy, so it will cause region peers (11, 12, 13) retry once
require.Equal(t, []uint64{11, 12, 11, 12, 13, 11, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"])
// store 12 has a follower busy, so it will break the workflow for region (11, 12, 13)
require.Equal(t, []uint64{11, 12, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"])
// region (11, 12, 13) has key range ["a", "b"), it's not finished.
require.Equal(t, []Range{{start: []byte("b"), end: []byte("c")}}, f.finishedRanges.ranges)
}