From aef752a407aca3cd379f9941cd51f475abea9bcb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 Jan 2023 15:24:33 +0800 Subject: [PATCH] lightning: continue this region round and retry on next round when TiKV is busy (#40278) * finish Signed-off-by: lance6716 * fix some comments Signed-off-by: lance6716 * fix CI Signed-off-by: lance6716 * address comment Signed-off-by: lance6716 * fix errdoc Signed-off-by: lance6716 * revert a change Signed-off-by: lance6716 * add lightning debug log Signed-off-by: lance6716 * fix bug Signed-off-by: lance6716 * address comment Signed-off-by: lance6716 Signed-off-by: lance6716 Co-authored-by: Weizhen Wang --- br/pkg/lightning/backend/local/engine.go | 26 +++ br/pkg/lightning/backend/local/engine_test.go | 34 +++ br/pkg/lightning/backend/local/local.go | 196 ++++++++---------- br/pkg/lightning/backend/local/local_test.go | 6 +- 4 files changed, 150 insertions(+), 112 deletions(-) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 32d0046542..a30f76c46a 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -984,6 +984,32 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter { return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger) } +func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) { + opt := &pebble.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + } + + iter := e.newKVIter(context.Background(), opt) + //nolint: errcheck + defer iter.Close() + // Needs seek to first because NewIter returns an iterator that is unpositioned + hasKey := iter.First() + if iter.Error() != nil { + return nil, nil, errors.Annotate(iter.Error(), "failed to read the first key") + } + if !hasKey { + return nil, nil, nil + } + firstKey := append([]byte{}, iter.Key()...) + iter.Last() + if iter.Error() != nil { + return nil, nil, errors.Annotate(iter.Error(), "failed to seek to the last key") + } + lastKey := append([]byte{}, iter.Key()...) + return firstKey, lastKey, nil +} + type sstMeta struct { path string minKey []byte diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index eae0225bb5..2b935f30bb 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -89,3 +89,37 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { }, }), errorEngineClosed) } + +func TestGetFirstAndLastKey(t *testing.T) { + db, tmpPath := makePebbleDB(t, nil) + f := &Engine{ + db: db, + sstDir: tmpPath, + } + err := db.Set([]byte("a"), []byte("a"), nil) + require.NoError(t, err) + err = db.Set([]byte("c"), []byte("c"), nil) + require.NoError(t, err) + err = db.Set([]byte("e"), []byte("e"), nil) + require.NoError(t, err) + + first, last, err := f.getFirstAndLastKey(nil, nil) + require.NoError(t, err) + require.Equal(t, []byte("a"), first) + require.Equal(t, []byte("e"), last) + + first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("d")) + require.NoError(t, err) + require.Equal(t, []byte("c"), first) + require.Equal(t, []byte("c"), last) + + first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("f")) + require.NoError(t, err) + require.Equal(t, []byte("c"), first) + require.Equal(t, []byte("e"), last) + + first, last, err = f.getFirstAndLastKey([]byte("y"), []byte("z")) + require.NoError(t, err) + require.Nil(t, first) + require.Nil(t, last) +} diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index d1df848bef..6ceb269f80 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -898,9 +898,15 @@ type rangeStats struct { totalBytes int64 } +type tikvWriteResult struct { + sstMeta []*sst.SSTMeta + finishedRange Range + rangeStats rangeStats +} + // WriteToTiKV writer engine key-value pairs to tikv and return the sst meta generated by tikv. // we don't need to do cleanup for the pairs written to tikv if encounters an error, -// tikv will takes the responsibility to do so. +// tikv will take the responsibility to do so. func (local *local) WriteToTiKV( ctx context.Context, engine *Engine, @@ -908,9 +914,9 @@ func (local *local) WriteToTiKV( start, end []byte, regionSplitSize int64, regionSplitKeys int64, -) ([]*sst.SSTMeta, Range, rangeStats, error) { +) (*tikvWriteResult, error) { failpoint.Inject("WriteToTiKVNotEnoughDiskSpace", func(_ failpoint.Value) { - failpoint.Return(nil, Range{}, rangeStats{}, + failpoint.Return(nil, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", "", 0, 0)) }) if local.checkTiKVAvaliable { @@ -926,7 +932,7 @@ func (local *local) WriteToTiKV( // The available disk percent of TiKV ratio := store.Status.Available * 100 / store.Status.Capacity if ratio < 10 { - return nil, Range{}, rangeStats{}, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", + return nil, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", store.Store.Address, store.Status.Available, store.Status.Capacity) } } @@ -939,29 +945,20 @@ func (local *local) WriteToTiKV( } begin := time.Now() regionRange := intersectRange(region.Region, Range{start: start, end: end}) - opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end} - iter := engine.newKVIter(ctx, opt) - //nolint: errcheck - defer iter.Close() - stats := rangeStats{} - iter.First() - if iter.Error() != nil { - return nil, Range{}, stats, errors.Annotate(iter.Error(), "failed to read the first key") + firstKey, lastKey, err := engine.getFirstAndLastKey(regionRange.start, regionRange.end) + if err != nil { + return nil, errors.Trace(err) } - if !iter.Valid() { + if firstKey == nil { log.FromContext(ctx).Info("keys within region is empty, skip ingest", logutil.Key("start", start), logutil.Key("regionStart", region.Region.StartKey), logutil.Key("end", end), logutil.Key("regionEnd", region.Region.EndKey)) - return nil, regionRange, stats, nil + return &tikvWriteResult{sstMeta: nil, finishedRange: regionRange, rangeStats: stats}, nil } - firstKey := codec.EncodeBytes([]byte{}, iter.Key()) - iter.Last() - if iter.Error() != nil { - return nil, Range{}, stats, errors.Annotate(iter.Error(), "failed to seek to the last key") - } - lastKey := codec.EncodeBytes([]byte{}, iter.Key()) + firstKey = codec.EncodeBytes([]byte{}, firstKey) + lastKey = codec.EncodeBytes([]byte{}, lastKey) u := uuid.New() meta := &sst.SSTMeta{ @@ -981,12 +978,12 @@ func (local *local) WriteToTiKV( for _, peer := range region.Region.GetPeers() { cli, err := local.getImportClient(ctx, peer.StoreId) if err != nil { - return nil, Range{}, stats, err + return nil, errors.Trace(err) } wstream, err := cli.Write(ctx) if err != nil { - return nil, Range{}, stats, errors.Trace(err) + return nil, errors.Trace(err) } // Bind uuid for this write request @@ -996,7 +993,7 @@ func (local *local) WriteToTiKV( }, } if err = wstream.Send(req); err != nil { - return nil, Range{}, stats, errors.Trace(err) + return nil, errors.Trace(err) } req.Chunk = &sst.WriteRequest_Batch{ Batch: &sst.WriteBatch{ @@ -1037,6 +1034,11 @@ func (local *local) WriteToTiKV( return nil } + opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end} + iter := engine.newKVIter(ctx, opt) + //nolint: errcheck + defer iter.Close() + for iter.First(); iter.Valid(); iter.Next() { kvSize := int64(len(iter.Key()) + len(iter.Value())) // here we reuse the `*sst.Pair`s to optimize object allocation @@ -1057,7 +1059,7 @@ func (local *local) WriteToTiKV( if count >= local.batchWriteKVPairs || size >= flushLimit { if err := flushKVs(); err != nil { - return nil, Range{}, stats, err + return nil, errors.Trace(err) } count = 0 size = 0 @@ -1069,12 +1071,12 @@ func (local *local) WriteToTiKV( } if iter.Error() != nil { - return nil, Range{}, stats, errors.Trace(iter.Error()) + return nil, errors.Trace(iter.Error()) } if count > 0 { if err := flushKVs(); err != nil { - return nil, Range{}, stats, err + return nil, errors.Trace(err) } count = 0 size = 0 @@ -1085,10 +1087,10 @@ func (local *local) WriteToTiKV( for i, wStream := range clients { resp, closeErr := wStream.CloseAndRecv() if closeErr != nil { - return nil, Range{}, stats, errors.Trace(closeErr) + return nil, errors.Trace(closeErr) } if resp.Error != nil { - return nil, Range{}, stats, errors.New(resp.Error.Message) + return nil, errors.New(resp.Error.Message) } if leaderID == region.Region.Peers[i].GetId() { leaderPeerMetas = resp.Metas @@ -1101,7 +1103,7 @@ func (local *local) WriteToTiKV( log.FromContext(ctx).Warn("write to tikv no leader", logutil.Region(region.Region), logutil.Leader(region.Leader), zap.Uint64("leader_id", leaderID), logutil.SSTMeta(meta), zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size)) - return nil, Range{}, stats, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d", + return nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d", region.Region.Id, leaderID) } @@ -1123,7 +1125,11 @@ func (local *local) WriteToTiKV( stats.count = totalCount stats.totalBytes = totalSize - return leaderPeerMetas, finishedRange, stats, nil + return &tikvWriteResult{ + sstMeta: leaderPeerMetas, + finishedRange: finishedRange, + rangeStats: stats, + }, nil } func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) { @@ -1155,21 +1161,12 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp } if local.shouldCheckWriteStall { - for { - maybeWriteStall, err := local.checkWriteStall(ctx, region) - if err != nil { - return nil, err - } - if !maybeWriteStall { - break - } - log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry", - zap.Duration("duration", writeStallSleepTime)) - select { - case <-time.After(writeStallSleepTime): - case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) - } + writeStall, resp, err := local.checkWriteStall(ctx, region) + if err != nil { + return nil, errors.Trace(err) + } + if writeStall { + return resp, nil } } @@ -1181,21 +1178,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp return resp, errors.Trace(err) } -func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) { +func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, *sst.IngestResponse, error) { for _, peer := range region.Region.GetPeers() { cli, err := local.getImportClient(ctx, peer.StoreId) if err != nil { - return false, errors.Trace(err) + return false, nil, errors.Trace(err) } + // currently we use empty MultiIngestRequest to check if TiKV is busy. + // If in future the rate limit feature contains more metrics we can switch to use it. resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{}) if err != nil { - return false, errors.Trace(err) + return false, nil, errors.Trace(err) } if resp.Error != nil && resp.Error.ServerIsBusy != nil { - return true, nil + return true, resp, nil } } - return false, nil + return false, nil, nil } func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range { @@ -1234,29 +1233,14 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit } func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, regionSplitSize int64, regionSplitKeys int64) ([]Range, error) { - iter := engine.newKVIter(ctx, &pebble.IterOptions{}) - //nolint: errcheck - defer iter.Close() - - iterError := func(e string) error { - err := iter.Error() - if err != nil { - return errors.Annotate(err, e) - } - return errors.New(e) + firstKey, lastKey, err := engine.getFirstAndLastKey(nil, nil) + if err != nil { + return nil, err + } + if firstKey == nil { + return nil, errors.New("could not find first pair") } - var firstKey, lastKey []byte - if iter.First() { - firstKey = append([]byte{}, iter.Key()...) - } else { - return nil, iterError("could not find first pair") - } - if iter.Last() { - lastKey = append([]byte{}, iter.Key()...) - } else { - return nil, iterError("could not find last pair") - } endKey := nextKey(lastKey) engineFileTotalSize := engine.TotalSize.Load() @@ -1286,45 +1270,27 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, r } func (local *local) writeAndIngestByRange( - ctxt context.Context, + ctx context.Context, engine *Engine, start, end []byte, regionSplitSize int64, regionSplitKeys int64, ) error { - ito := &pebble.IterOptions{ - LowerBound: start, - UpperBound: end, + pairStart, pairEnd, err := engine.getFirstAndLastKey(start, end) + if err != nil { + return err } - - iter := engine.newKVIter(ctxt, ito) - //nolint: errcheck - defer iter.Close() - // Needs seek to first because NewIter returns an iterator that is unpositioned - hasKey := iter.First() - if iter.Error() != nil { - return errors.Annotate(iter.Error(), "failed to read the first key") - } - if !hasKey { - log.FromContext(ctxt).Info("There is no pairs in iterator", + if pairStart == nil { + log.FromContext(ctx).Info("There is no pairs in iterator", logutil.Key("start", start), logutil.Key("end", end)) engine.finishedRanges.add(Range{start: start, end: end}) return nil } - pairStart := append([]byte{}, iter.Key()...) - iter.Last() - if iter.Error() != nil { - return errors.Annotate(iter.Error(), "failed to seek to the last key") - } - pairEnd := append([]byte{}, iter.Key()...) var regions []*split.RegionInfo - var err error - ctx, cancel := context.WithCancel(ctxt) - defer cancel() -WriteAndIngest: +ScanWriteIngest: for retry := 0; retry < maxRetryTimes; { if retry != 0 { select { @@ -1340,7 +1306,7 @@ WriteAndIngest: log.FromContext(ctx).Warn("scan region failed", log.ShortError(err), zap.Int("region_len", len(regions)), logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.Int("retry", retry)) retry++ - continue WriteAndIngest + continue ScanWriteIngest } for _, region := range regions { @@ -1365,7 +1331,7 @@ WriteAndIngest: } log.FromContext(ctx).Info("retry write and ingest kv pairs", logutil.Key("startKey", pairStart), logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry)) - continue WriteAndIngest + continue ScanWriteIngest } } @@ -1381,6 +1347,7 @@ const ( retryNone retryType = iota retryWrite retryIngest + retryBusyIngest ) func (local *local) isRetryableImportTiKVError(err error) bool { @@ -1396,6 +1363,11 @@ func (local *local) isRetryableImportTiKVError(err error) bool { return common.IsRetryableError(err) } +// writeAndIngestPairs writes the kv pairs in the range [start, end) to the peers +// of the region, and then send the ingest command to do RocksDB ingest. +// when return nil, it does not mean the whole task success. The success ranges is +// recorded in the engine.finishedRanges. +// TODO: regionSplitSize and regionSplitKeys can be a member of Engine, no need to pass it in every function. func (local *local) writeAndIngestPairs( ctx context.Context, engine *Engine, @@ -1405,13 +1377,10 @@ func (local *local) writeAndIngestPairs( regionSplitKeys int64, ) error { var err error - + var writeResult *tikvWriteResult loopWrite: for i := 0; i < maxRetryTimes; i++ { - var metas []*sst.SSTMeta - var finishedRange Range - var rangeStats rangeStats - metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys) + writeResult, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys) if err != nil { if !local.isRetryableImportTiKVError(err) { return err @@ -1420,6 +1389,7 @@ loopWrite: log.FromContext(ctx).Warn("write to tikv failed", log.ShortError(err), zap.Int("retry", i)) continue loopWrite } + metas, finishedRange, rangeStats := writeResult.sstMeta, writeResult.finishedRange, writeResult.rangeStats if len(metas) == 0 { return nil @@ -1486,6 +1456,7 @@ loopWrite: // ingest next meta break } + switch retryTy { case retryNone: log.FromContext(ctx).Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(ingestMetas), @@ -1498,25 +1469,30 @@ loopWrite: case retryIngest: region = newRegion continue + case retryBusyIngest: + log.FromContext(ctx).Warn("meet tikv busy when ingest", log.ShortError(err), logutil.SSTMetas(ingestMetas), + logutil.Region(region.Region)) + // ImportEngine will continue on this unfinished range + return nil } } } - if err != nil { - log.FromContext(ctx).Warn("write and ingest region, will retry import full range", log.ShortError(err), - logutil.Region(region.Region), logutil.Key("start", start), - logutil.Key("end", end)) - } else { + if err == nil { engine.importedKVSize.Add(rangeStats.totalBytes) engine.importedKVCount.Add(rangeStats.count) engine.finishedRanges.add(finishedRange) if local.metrics != nil { local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes)) } + return nil } + + log.FromContext(ctx).Warn("write and ingest region, will retry import full range", log.ShortError(err), + logutil.Region(region.Region), logutil.Key("start", start), + logutil.Key("end", end)) return errors.Trace(err) } - return errors.Trace(err) } @@ -2015,7 +1991,7 @@ func (local *local) isIngestRetryable( } return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage()) case errPb.ServerIsBusy != nil: - return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage()) + return retryBusyIngest, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage()) case errPb.RegionNotFound != nil: return retryNone, nil, common.ErrKVRegionNotFound.GenWithStack(errPb.GetMessage()) case errPb.ReadIndexNotReady != nil: diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 22d6403d0a..a485bdecac 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1313,6 +1313,8 @@ func TestCheckPeersBusy(t *testing.T) { require.NoError(t, err) require.Equal(t, []uint64{11, 12, 13, 21, 22, 23}, apiInvokeRecorder["Write"]) - // store 12 has a follower busy, so it will cause region peers (11, 12, 13) retry once - require.Equal(t, []uint64{11, 12, 11, 12, 13, 11, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) + // store 12 has a follower busy, so it will break the workflow for region (11, 12, 13) + require.Equal(t, []uint64{11, 12, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) + // region (11, 12, 13) has key range ["a", "b"), it's not finished. + require.Equal(t, []Range{{start: []byte("b"), end: []byte("c")}}, f.finishedRanges.ranges) }