Files
tidb/pkg/util/prefetch/reader_test.go

158 lines
4.2 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prefetch
import (
"bytes"
"io"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestBasic(t *testing.T) {
source := bytes.NewReader([]byte("01234567890"))
r := NewReader(io.NopCloser(source), source.Size(), 3)
buf := make([]byte, 1)
n, err := r.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 1, n)
require.EqualValues(t, "0", buf[:n])
buf = make([]byte, 2)
n, err = r.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 2, n)
require.EqualValues(t, "12", buf[:n])
buf = make([]byte, 3)
n, err = r.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 3, n)
require.EqualValues(t, "345", buf[:n])
buf = make([]byte, 4)
n, err = r.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 4, n)
require.EqualValues(t, "6789", buf[:n])
n, err = r.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 1, n)
require.EqualValues(t, "0", buf[:n])
_, err = r.Read(buf)
require.ErrorIs(t, err, io.EOF)
source = bytes.NewReader([]byte("01234567890"))
r = NewReader(io.NopCloser(source), source.Size(), 3)
buf = make([]byte, 11)
n, err = r.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 11, n)
_, err = r.Read(buf)
require.ErrorIs(t, err, io.EOF)
source = bytes.NewReader([]byte("01234"))
r = NewReader(io.NopCloser(source), source.Size(), 100)
buf = make([]byte, 11)
n, err = r.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 5, n)
_, err = r.Read(buf)
require.ErrorIs(t, err, io.EOF)
}
type unexpectedEOFReader struct {
}
func (u *unexpectedEOFReader) Read(_ []byte) (n int, err error) {
return 0, io.ErrUnexpectedEOF
}
func TestConvertUnexpectedEOF(t *testing.T) {
r := NewReader(io.NopCloser(&unexpectedEOFReader{}), 10, 10)
buf := make([]byte, 10)
_, err := r.Read(buf)
// prefetch reader should not convert underlying io.ErrUnexpectedEOF to io.EOF
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
}
func TestCloseBeforeDrainRead(t *testing.T) {
data := make([]byte, 1024)
source := bytes.NewReader(data)
r := NewReader(io.NopCloser(source), source.Size(), 2)
err := r.Close()
require.NoError(t, err)
}
type fragmentReader struct {
data []byte
i atomic.Int64
fragSize int
}
func (r *fragmentReader) Read(p []byte) (n int, err error) {
i := int(r.i.Load())
if i >= len(r.data) {
return 0, io.EOF
}
copySize := min(len(p), r.fragSize)
n = copy(p, r.data[i:i+copySize])
r.i.Add(int64(n))
return
}
func TestFillPrefetchBuffer(t *testing.T) {
// test fragmentReader behaviour
fragReader := &fragmentReader{
data: []byte("0123456789"),
fragSize: 3,
}
buf := make([]byte, 5)
n, err := fragReader.Read(buf)
require.NoError(t, err)
require.Equal(t, 3, n)
require.Equal(t, "012", string(buf[:n]))
buf = make([]byte, 1)
n, err = fragReader.Read(buf)
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, "3", string(buf[:n]))
fragReader = &fragmentReader{
data: []byte("0123456789"),
fragSize: 3,
}
// when prefetch is 10B, the two internal ping-pong buffers are 5B
prefetchReader := NewReader(io.NopCloser(fragReader), int64(len(fragReader.data)), 10)
// when no read happens, one of ping-pong buffer is fully filled
require.Eventually(t, func() bool {
return fragReader.i.Load() == 5
}, time.Second, 10*time.Millisecond)
// when any small read happens, one of ping-pong buffer is serving the read,
// another buffer is fully filled
buf = make([]byte, 2)
n, err = prefetchReader.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 2, n)
require.EqualValues(t, "01", string(buf[:n]))
require.Eventually(t, func() bool {
return fragReader.i.Load() == 10
}, time.Second, 10*time.Millisecond)
}