lightning: fix wrongly retry writing when partial write + "needRescan" (#43364)
Signed-off-by: lance6716 <lance6716@gmail.com>
This commit is contained in:
@ -1367,6 +1367,9 @@ func (local *Backend) executeJob(
|
||||
job.lastRetryableErr = err
|
||||
return nil
|
||||
}
|
||||
if job.stage == needRescan {
|
||||
return nil
|
||||
}
|
||||
|
||||
if job.writeResult == nil || job.writeResult.remainingStartKey == nil {
|
||||
return nil
|
||||
|
||||
@ -1343,10 +1343,115 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
|
||||
// then meet NotLeader error, scanned new region (11,12,13)
|
||||
// repeat above for 11,12,13
|
||||
require.Equal(t, []uint64{1, 2, 3, 11, 12, 13}, apiInvokeRecorder["Write"])
|
||||
// store 12 has a follower busy, so it will break the workflow for region (11, 12, 13)
|
||||
require.Equal(t, []uint64{1, 2, 3, 1, 11, 12, 13, 11}, apiInvokeRecorder["MultiIngest"])
|
||||
}
|
||||
|
||||
func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// let lightning meet any error that will call convertStageTo(needRescan)
|
||||
apiInvokeRecorder := map[string][]uint64{}
|
||||
notLeaderResp := &sst.IngestResponse{
|
||||
Error: &errorpb.Error{
|
||||
NotLeader: &errorpb.NotLeader{Leader: &metapb.Peer{StoreId: 11}},
|
||||
}}
|
||||
|
||||
local := &Backend{
|
||||
splitCli: initTestSplitClient3Replica([][]byte{{}, {'c'}}, nil),
|
||||
importClientFactory: &mockImportClientFactory{
|
||||
stores: []*metapb.Store{
|
||||
{Id: 1}, {Id: 2}, {Id: 3},
|
||||
},
|
||||
createClientFn: func(store *metapb.Store) sst.ImportSSTClient {
|
||||
importCli := newMockImportClient()
|
||||
importCli.store = store
|
||||
importCli.apiInvokeRecorder = apiInvokeRecorder
|
||||
if store.Id == 1 {
|
||||
importCli.retry = 1
|
||||
importCli.resp = notLeaderResp
|
||||
}
|
||||
return importCli
|
||||
},
|
||||
},
|
||||
logger: log.L(),
|
||||
writeLimiter: noopStoreWriteLimiter{},
|
||||
bufferPool: membuf.NewPool(),
|
||||
supportMultiIngest: true,
|
||||
tikvCodec: keyspace.CodecV1,
|
||||
}
|
||||
|
||||
db, tmpPath := makePebbleDB(t, nil)
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel2 := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
cancel: cancel2,
|
||||
sstMetasChan: make(chan metaOrFlush, 64),
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
err := f.db.Set([]byte("a"), []byte("a"), nil)
|
||||
require.NoError(t, err)
|
||||
err = f.db.Set([]byte("a2"), []byte("a2"), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
|
||||
partialWriteJob := ®ionJob{
|
||||
keyRange: Range{start: []byte("a"), end: []byte("c")},
|
||||
region: &split.RegionInfo{
|
||||
Region: &metapb.Region{
|
||||
Id: 1,
|
||||
Peers: []*metapb.Peer{
|
||||
{Id: 1, StoreId: 1}, {Id: 2, StoreId: 2}, {Id: 3, StoreId: 3},
|
||||
},
|
||||
StartKey: []byte("a"),
|
||||
EndKey: []byte("c"),
|
||||
},
|
||||
Leader: &metapb.Peer{Id: 1, StoreId: 1},
|
||||
},
|
||||
stage: regionScanned,
|
||||
engine: f,
|
||||
// use small regionSplitSize to trigger partial write
|
||||
regionSplitSize: 1,
|
||||
}
|
||||
var jobWg sync.WaitGroup
|
||||
jobWg.Add(1)
|
||||
jobCh <- partialWriteJob
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
jobOutCh := make(chan *regionJob)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
job := <-jobOutCh
|
||||
if job.stage == regionScanned {
|
||||
jobWg.Done()
|
||||
return
|
||||
}
|
||||
require.Fail(t, "job stage %s is not expected", job.stage)
|
||||
}
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
jobWg.Wait()
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
require.Equal(t, []uint64{1, 2, 3}, apiInvokeRecorder["Write"])
|
||||
require.Equal(t, []uint64{1}, apiInvokeRecorder["MultiIngest"])
|
||||
}
|
||||
|
||||
// mockGetSizeProperties mocks that 50MB * 20 SST file.
|
||||
func mockGetSizeProperties(log.Logger, *pebble.DB, KeyAdapter) (*sizeProperties, error) {
|
||||
props := newSizeProperties()
|
||||
|
||||
Reference in New Issue
Block a user