txn: Add sortedRetrievers to perform read from custom retrievers (#27675)
This commit is contained in:
@ -16,7 +16,10 @@ package txn
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
)
|
||||
|
||||
@ -62,3 +65,147 @@ func (s *RangedKVRetriever) Intersect(startKey, endKey kv.Key) *RangedKVRetrieve
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ScanCurrentRange scans the retriever for current range
|
||||
func (s *RangedKVRetriever) ScanCurrentRange(reverse bool) (kv.Iterator, error) {
|
||||
if !s.Valid() {
|
||||
return nil, errors.New("retriever is invalid")
|
||||
}
|
||||
|
||||
startKey, endKey := s.StartKey, s.EndKey
|
||||
if !reverse {
|
||||
return s.Iter(startKey, endKey)
|
||||
}
|
||||
|
||||
iter, err := s.IterReverse(endKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(startKey) > 0 {
|
||||
iter = newLowerBoundReverseIter(iter, startKey)
|
||||
}
|
||||
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
type sortedRetrievers []*RangedKVRetriever
|
||||
|
||||
func (retrievers sortedRetrievers) TryGet(ctx context.Context, k kv.Key) (bool, []byte, error) {
|
||||
if len(retrievers) == 0 {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
_, r := retrievers.searchRetrieverByKey(k)
|
||||
if r == nil || !r.Contains(k) {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
val, err := r.Get(ctx, k)
|
||||
return true, val, err
|
||||
}
|
||||
|
||||
func (retrievers sortedRetrievers) TryBatchGet(ctx context.Context, keys []kv.Key, collectF func(k kv.Key, v []byte)) ([]kv.Key, error) {
|
||||
if len(retrievers) == 0 {
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
var nonCustomKeys []kv.Key
|
||||
for _, k := range keys {
|
||||
custom, val, err := retrievers.TryGet(ctx, k)
|
||||
if !custom {
|
||||
nonCustomKeys = append(nonCustomKeys, k)
|
||||
continue
|
||||
}
|
||||
|
||||
if kv.ErrNotExist.Equal(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
collectF(k, val)
|
||||
}
|
||||
|
||||
return nonCustomKeys, nil
|
||||
}
|
||||
|
||||
// GetScanRetrievers gets all retrievers who have intersections with range [StartKey, endKey).
|
||||
// If snapshot is not nil, the range between two custom retrievers with a snapshot retriever will also be returned.
|
||||
func (retrievers sortedRetrievers) GetScanRetrievers(startKey, endKey kv.Key, snapshot kv.Retriever) []*RangedKVRetriever {
|
||||
// According to our experience, in most cases there is only one retriever returned.
|
||||
result := make([]*RangedKVRetriever, 0, 1)
|
||||
|
||||
// Firstly, we should find the first retriever whose EndKey is after input startKey,
|
||||
// it is obvious that the retrievers before it do not have a common range with the input.
|
||||
idx, _ := retrievers.searchRetrieverByKey(startKey)
|
||||
|
||||
// If not found, it means the scan range is located out of retrievers, just use snapshot to scan it
|
||||
if idx == len(retrievers) {
|
||||
if snapshot != nil {
|
||||
result = append(result, NewRangeRetriever(snapshot, startKey, endKey))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Check every retriever whose index >= idx whether it intersects with the input range.
|
||||
// If it is true, put the intersected range to the result.
|
||||
// The range between two retrievers should also be checked because we read snapshot data from there.
|
||||
checks := retrievers[idx:]
|
||||
for i, retriever := range checks {
|
||||
// Intersect with the range which is on the left of the retriever and use snapshot to read it
|
||||
// Notice that when len(retriever.StartKey) == 0, that means there is no left range for it
|
||||
if len(retriever.StartKey) > 0 && snapshot != nil {
|
||||
var snapStartKey kv.Key
|
||||
if i != 0 {
|
||||
snapStartKey = checks[i-1].EndKey
|
||||
} else {
|
||||
snapStartKey = nil
|
||||
}
|
||||
|
||||
if r := NewRangeRetriever(snapshot, snapStartKey, retriever.StartKey).Intersect(startKey, endKey); r != nil {
|
||||
result = append(result, r)
|
||||
}
|
||||
}
|
||||
|
||||
// Intersect the current retriever
|
||||
if r := retriever.Intersect(startKey, endKey); r != nil {
|
||||
result = append(result, r)
|
||||
continue
|
||||
}
|
||||
|
||||
// Not necessary to continue when the current retriever does not have a valid intersection
|
||||
return result
|
||||
}
|
||||
|
||||
// If the last retriever has an intersection, we should still check the range on its right.
|
||||
lastRetriever := checks[len(checks)-1]
|
||||
if snapshot != nil && len(lastRetriever.EndKey) > 0 {
|
||||
if r := NewRangeRetriever(snapshot, lastRetriever.EndKey, nil).Intersect(startKey, endKey); r != nil {
|
||||
result = append(result, r)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// searchRetrieverByKey searches the first retriever whose EndKey after the specified key
|
||||
func (retrievers sortedRetrievers) searchRetrieverByKey(k kv.Key) (int, *RangedKVRetriever) {
|
||||
n := len(retrievers)
|
||||
if n == 0 {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
i := sort.Search(n, func(i int) bool {
|
||||
r := retrievers[i]
|
||||
return len(r.EndKey) == 0 || bytes.Compare(r.EndKey, k) > 0
|
||||
})
|
||||
|
||||
if i < n {
|
||||
return i, retrievers[i]
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
@ -14,10 +14,13 @@
|
||||
package txn
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tikv/client-go/v2/txnkv/transaction"
|
||||
)
|
||||
|
||||
func newRetriever(startKey, endKey kv.Key) *RangedKVRetriever {
|
||||
@ -249,3 +252,463 @@ func TestRangedKVRetrieverIntersect(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRangedKVRetrieverScanCurrentRange(t *testing.T) {
|
||||
memBuffer := newMemBufferRetriever(t, [][]interface{}{{"a1", "v1"}, {"a2", "v2"}, {"a3", "v3"}, {"a4", "v4"}})
|
||||
cases := []struct {
|
||||
retriever *RangedKVRetriever
|
||||
reverse bool
|
||||
result [][]interface{}
|
||||
}{
|
||||
{
|
||||
retriever: NewRangeRetriever(memBuffer, nil, nil),
|
||||
result: [][]interface{}{{"a1", "v1"}, {"a2", "v2"}, {"a3", "v3"}, {"a4", "v4"}},
|
||||
},
|
||||
{
|
||||
retriever: NewRangeRetriever(memBuffer, nil, nil),
|
||||
reverse: true,
|
||||
result: [][]interface{}{{"a4", "v4"}, {"a3", "v3"}, {"a2", "v2"}, {"a1", "v1"}},
|
||||
},
|
||||
{
|
||||
retriever: NewRangeRetriever(memBuffer, kv.Key("a10"), kv.Key("a4")),
|
||||
result: [][]interface{}{{"a2", "v2"}, {"a3", "v3"}},
|
||||
},
|
||||
{
|
||||
retriever: NewRangeRetriever(memBuffer, kv.Key("a10"), kv.Key("a4")),
|
||||
reverse: true,
|
||||
result: [][]interface{}{{"a3", "v3"}, {"a2", "v2"}},
|
||||
},
|
||||
{
|
||||
retriever: NewRangeRetriever(memBuffer, nil, kv.Key("a4")),
|
||||
reverse: true,
|
||||
result: [][]interface{}{{"a3", "v3"}, {"a2", "v2"}, {"a1", "v1"}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
iter, err := c.retriever.ScanCurrentRange(c.reverse)
|
||||
assert.Nil(t, err)
|
||||
for i := range c.result {
|
||||
expectedKey := makeBytes(c.result[i][0])
|
||||
expectedVal := makeBytes(c.result[i][1])
|
||||
assert.True(t, iter.Valid())
|
||||
gotKey := []byte(iter.Key())
|
||||
gotVal := iter.Value()
|
||||
assert.Equal(t, expectedKey, gotKey)
|
||||
assert.Equal(t, expectedVal, gotVal)
|
||||
err = iter.Next()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
assert.False(t, iter.Valid())
|
||||
}
|
||||
}
|
||||
|
||||
func TestSearchRetrieverByKey(t *testing.T) {
|
||||
retrievers := sortedRetrievers{
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, kv.Key("ab8"), kv.Key("ab9")),
|
||||
}
|
||||
|
||||
retrievers2 := sortedRetrievers{
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, nil, kv.Key("ab1")),
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(&kv.EmptyRetriever{}, kv.Key("ab8"), nil),
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
retrievers sortedRetrievers
|
||||
search interface{}
|
||||
expectedIdx int
|
||||
}{
|
||||
{retrievers: retrievers, search: nil, expectedIdx: 0},
|
||||
{retrievers: retrievers, search: "ab", expectedIdx: 0},
|
||||
{retrievers: retrievers, search: "ab1", expectedIdx: 0},
|
||||
{retrievers: retrievers, search: "ab2", expectedIdx: 0},
|
||||
{retrievers: retrievers, search: "ab3", expectedIdx: 1},
|
||||
{retrievers: retrievers, search: "ab31", expectedIdx: 1},
|
||||
{retrievers: retrievers, search: "ab4", expectedIdx: 1},
|
||||
{retrievers: retrievers, search: "ab5", expectedIdx: 2},
|
||||
{retrievers: retrievers, search: "ab51", expectedIdx: 2},
|
||||
{retrievers: retrievers, search: "ab71", expectedIdx: 3},
|
||||
{retrievers: retrievers, search: "ab8", expectedIdx: 3},
|
||||
{retrievers: retrievers, search: "ab81", expectedIdx: 3},
|
||||
{retrievers: retrievers, search: "ab9", expectedIdx: 4},
|
||||
{retrievers: retrievers, search: "aba", expectedIdx: 4},
|
||||
{retrievers: retrievers2, search: nil, expectedIdx: 0},
|
||||
{retrievers: retrievers2, search: "ab0", expectedIdx: 0},
|
||||
{retrievers: retrievers2, search: "ab8", expectedIdx: 3},
|
||||
{retrievers: retrievers2, search: "ab9", expectedIdx: 3},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
idx, r := c.retrievers.searchRetrieverByKey(makeBytes(c.search))
|
||||
assert.Equal(t, c.expectedIdx, idx)
|
||||
if idx < len(c.retrievers) {
|
||||
assert.Equal(t, c.retrievers[idx], r)
|
||||
} else {
|
||||
assert.Nil(t, r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetScanRetrievers(t *testing.T) {
|
||||
type mockRetriever struct {
|
||||
kv.EmptyRetriever
|
||||
// Avoid zero size struct, make it can be compared for different variables
|
||||
_ interface{}
|
||||
}
|
||||
|
||||
snap := &mockRetriever{}
|
||||
retrievers1 := sortedRetrievers{
|
||||
NewRangeRetriever(&mockRetriever{}, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(&mockRetriever{}, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(&mockRetriever{}, kv.Key("ab6"), kv.Key("ab7")),
|
||||
}
|
||||
|
||||
retrievers2 := sortedRetrievers{
|
||||
NewRangeRetriever(&mockRetriever{}, nil, kv.Key("ab1")),
|
||||
NewRangeRetriever(&mockRetriever{}, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(&mockRetriever{}, kv.Key("ab5"), kv.Key("ab7")),
|
||||
NewRangeRetriever(&mockRetriever{}, kv.Key("ab8"), nil),
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
retrievers sortedRetrievers
|
||||
startKey interface{}
|
||||
endKey interface{}
|
||||
expected sortedRetrievers
|
||||
}{
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: "ab1",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: "ab2",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab2")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: "ab3",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab3")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: "ab4",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab4")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: "ab51",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(snap, kv.Key("ab5"), kv.Key("ab51")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: "ab61",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(snap, kv.Key("ab5"), kv.Key("ab6")),
|
||||
NewRangeRetriever(retrievers1[2].Retriever, kv.Key("ab6"), kv.Key("ab61")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: "ab8",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(snap, kv.Key("ab5"), kv.Key("ab6")),
|
||||
NewRangeRetriever(retrievers1[2].Retriever, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(snap, kv.Key("ab7"), kv.Key("ab8")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: nil, endKey: nil,
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, nil, kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(snap, kv.Key("ab5"), kv.Key("ab6")),
|
||||
NewRangeRetriever(retrievers1[2].Retriever, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(snap, kv.Key("ab7"), nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab0", endKey: nil,
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(snap, kv.Key("ab5"), kv.Key("ab6")),
|
||||
NewRangeRetriever(retrievers1[2].Retriever, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(snap, kv.Key("ab7"), nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab2", endKey: nil,
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(retrievers1[0].Retriever, kv.Key("ab2"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(snap, kv.Key("ab5"), kv.Key("ab6")),
|
||||
NewRangeRetriever(retrievers1[2].Retriever, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(snap, kv.Key("ab7"), nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab3", endKey: nil,
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(snap, kv.Key("ab5"), kv.Key("ab6")),
|
||||
NewRangeRetriever(retrievers1[2].Retriever, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(snap, kv.Key("ab7"), nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab51", endKey: nil,
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab51"), kv.Key("ab6")),
|
||||
NewRangeRetriever(retrievers1[2].Retriever, kv.Key("ab6"), kv.Key("ab7")),
|
||||
NewRangeRetriever(snap, kv.Key("ab7"), nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab3", endKey: "ab4",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(retrievers1[1].Retriever, kv.Key("ab3"), kv.Key("ab4")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab51", endKey: "ab52",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab51"), kv.Key("ab52")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab8", endKey: nil,
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab8"), nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers1,
|
||||
startKey: "ab8", endKey: "ab9",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(snap, kv.Key("ab8"), kv.Key("ab9")),
|
||||
},
|
||||
},
|
||||
{
|
||||
retrievers: retrievers2,
|
||||
startKey: "ab0", endKey: "ab9",
|
||||
expected: sortedRetrievers{
|
||||
NewRangeRetriever(retrievers2[0].Retriever, kv.Key("ab0"), kv.Key("ab1")),
|
||||
NewRangeRetriever(snap, kv.Key("ab1"), kv.Key("ab3")),
|
||||
NewRangeRetriever(retrievers2[1].Retriever, kv.Key("ab3"), kv.Key("ab5")),
|
||||
NewRangeRetriever(retrievers2[2].Retriever, kv.Key("ab5"), kv.Key("ab7")),
|
||||
NewRangeRetriever(snap, kv.Key("ab7"), kv.Key("ab8")),
|
||||
NewRangeRetriever(retrievers2[3].Retriever, kv.Key("ab8"), kv.Key("ab9")),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
result := c.retrievers.GetScanRetrievers(makeBytes(c.startKey), makeBytes(c.endKey), snap)
|
||||
expected := c.expected
|
||||
assert.Equal(t, len(expected), len(result))
|
||||
for i := range expected {
|
||||
assert.Same(t, expected[i].Retriever, result[i].Retriever)
|
||||
assert.Equal(t, expected[i].StartKey, result[i].StartKey)
|
||||
assert.Equal(t, expected[i].EndKey, result[i].EndKey)
|
||||
}
|
||||
|
||||
result = c.retrievers.GetScanRetrievers(makeBytes(c.startKey), makeBytes(c.endKey), nil)
|
||||
expected = make([]*RangedKVRetriever, 0)
|
||||
for _, r := range c.expected {
|
||||
if r.Retriever != snap {
|
||||
expected = append(expected, r)
|
||||
}
|
||||
}
|
||||
assert.Equal(t, len(expected), len(result))
|
||||
for i := range expected {
|
||||
assert.Same(t, expected[i].Retriever, result[i].Retriever)
|
||||
assert.Equal(t, expected[i].StartKey, result[i].StartKey)
|
||||
assert.Equal(t, expected[i].EndKey, result[i].EndKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newMemBufferRetriever(t *testing.T, data [][]interface{}) kv.MemBuffer {
|
||||
txn, err := transaction.NewTiKVTxn(nil, nil, 0, "")
|
||||
assert.Nil(t, err)
|
||||
memBuffer := NewTiKVTxn(txn).GetMemBuffer()
|
||||
for _, d := range data {
|
||||
err := memBuffer.Set(makeBytes(d[0]), makeBytes(d[1]))
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
return memBuffer
|
||||
}
|
||||
|
||||
func TestSortedRetrieversTryGet(t *testing.T) {
|
||||
retrievers := sortedRetrievers{
|
||||
NewRangeRetriever(
|
||||
newMemBufferRetriever(t, [][]interface{}{{"ab0", "v0"}, {"ab2", "v2"}, {"ab3", "v3"}, {"ab4", "v4x"}}),
|
||||
kv.Key("ab1"), kv.Key("ab3"),
|
||||
),
|
||||
NewRangeRetriever(
|
||||
newMemBufferRetriever(t, [][]interface{}{{"ab4", "v4"}, {"ab41", "v41"}}),
|
||||
kv.Key("ab3"), kv.Key("ab5"),
|
||||
),
|
||||
NewRangeRetriever(
|
||||
newMemBufferRetriever(t, [][]interface{}{{"ab7", "v7"}}),
|
||||
kv.Key("ab6"), kv.Key("ab8"),
|
||||
),
|
||||
}
|
||||
|
||||
tryGetCases := [][]interface{}{
|
||||
// {key, expectedValue, fromRetriever}
|
||||
{"ab0", nil, false},
|
||||
{"ab1", kv.ErrNotExist, true},
|
||||
{"ab11", kv.ErrNotExist, true},
|
||||
{"ab2", "v2", true},
|
||||
{"ab3", kv.ErrNotExist, true},
|
||||
{"ab4", "v4", true},
|
||||
{"ab41", "v41", true},
|
||||
{"ab5", nil, false},
|
||||
{"ab7", "v7", true},
|
||||
{"ab8", nil, false},
|
||||
{"ab9", nil, false},
|
||||
}
|
||||
|
||||
for _, c := range tryGetCases {
|
||||
fromRetriever, val, err := retrievers.TryGet(context.TODO(), makeBytes(c[0]))
|
||||
assert.Equal(t, c[2], fromRetriever)
|
||||
if !fromRetriever {
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, val)
|
||||
continue
|
||||
}
|
||||
|
||||
if expectedErr, ok := c[1].(error); ok {
|
||||
assert.True(t, errors.ErrorEqual(expectedErr, err))
|
||||
assert.Nil(t, val)
|
||||
} else {
|
||||
assert.Equal(t, makeBytes(c[1]), val)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSortedRetrieversTryBatchGet(t *testing.T) {
|
||||
retrievers := sortedRetrievers{
|
||||
NewRangeRetriever(
|
||||
newMemBufferRetriever(t, [][]interface{}{{"ab0", "v0"}, {"ab2", "v2"}, {"ab3", "v3"}, {"ab4", "v4x"}}),
|
||||
kv.Key("ab1"), kv.Key("ab3"),
|
||||
),
|
||||
NewRangeRetriever(
|
||||
newMemBufferRetriever(t, [][]interface{}{{"ab4", "v4"}, {"ab41", "v41"}, {"ab51", "v51"}}),
|
||||
kv.Key("ab3"), kv.Key("ab5"),
|
||||
),
|
||||
NewRangeRetriever(
|
||||
newMemBufferRetriever(t, [][]interface{}{{"ab7", "v7"}}),
|
||||
kv.Key("ab6"), kv.Key("ab8"),
|
||||
),
|
||||
}
|
||||
|
||||
tryBatchGetCases := []struct {
|
||||
keys []kv.Key
|
||||
result map[string]string
|
||||
retKeys []kv.Key
|
||||
}{
|
||||
{
|
||||
keys: []kv.Key{kv.Key("ab0")},
|
||||
result: map[string]string{},
|
||||
retKeys: []kv.Key{kv.Key("ab0")},
|
||||
},
|
||||
{
|
||||
keys: []kv.Key{kv.Key("ab0"), kv.Key("ab51"), kv.Key("ab52"), kv.Key("ab9")},
|
||||
result: map[string]string{},
|
||||
retKeys: []kv.Key{kv.Key("ab0"), kv.Key("ab51"), kv.Key("ab52"), kv.Key("ab9")},
|
||||
},
|
||||
{
|
||||
keys: []kv.Key{kv.Key("ab21"), kv.Key("ab3"), kv.Key("ab4"), kv.Key("ab41"), kv.Key("ab7")},
|
||||
result: map[string]string{
|
||||
"ab4": "v4",
|
||||
"ab41": "v41",
|
||||
"ab7": "v7",
|
||||
},
|
||||
retKeys: nil,
|
||||
},
|
||||
{
|
||||
keys: []kv.Key{kv.Key("ab0"), kv.Key("ab2"), kv.Key("ab51"), kv.Key("ab7"), kv.Key("ab9")},
|
||||
result: map[string]string{
|
||||
"ab2": "v2",
|
||||
"ab7": "v7",
|
||||
},
|
||||
retKeys: []kv.Key{kv.Key("ab0"), kv.Key("ab51"), kv.Key("ab9")},
|
||||
},
|
||||
{
|
||||
keys: []kv.Key{kv.Key("ab2"), kv.Key("ab4"), kv.Key("ab51"), kv.Key("ab52"), kv.Key("ab6"), kv.Key("ab7"), kv.Key("ab9")},
|
||||
result: map[string]string{
|
||||
"ab2": "v2",
|
||||
"ab4": "v4",
|
||||
"ab7": "v7",
|
||||
},
|
||||
retKeys: []kv.Key{kv.Key("ab51"), kv.Key("ab52"), kv.Key("ab9")},
|
||||
},
|
||||
{
|
||||
keys: []kv.Key{kv.Key("ab0"), kv.Key("ab51"), kv.Key("ab6"), kv.Key("ab7"), kv.Key("ab8"), kv.Key("ab9")},
|
||||
result: map[string]string{
|
||||
"ab7": "v7",
|
||||
},
|
||||
retKeys: []kv.Key{kv.Key("ab0"), kv.Key("ab51"), kv.Key("ab8"), kv.Key("ab9")},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range tryBatchGetCases {
|
||||
got := make(map[string][]byte)
|
||||
keys, err := retrievers.TryBatchGet(context.TODO(), c.keys, func(k kv.Key, v []byte) {
|
||||
_, ok := got[string(k)]
|
||||
assert.False(t, ok)
|
||||
got[string(k)] = v
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, c.retKeys, keys)
|
||||
assert.Equal(t, len(c.result), len(got))
|
||||
for k, v := range c.result {
|
||||
val, ok := got[k]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []byte(v), val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user