372 lines
10 KiB
Go
372 lines
10 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"context"
|
|
goerrors "errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/lightning/membuf"
|
|
"github.com/pingcap/tidb/pkg/objstore/objectio"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/size"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
// ConcurrentReaderBufferSizePerConc is the buffer size for concurrent reader per
|
|
// concurrency.
|
|
ConcurrentReaderBufferSizePerConc = int(8 * size.MB)
|
|
// in readAllData, expected concurrency less than this value will not use
|
|
// concurrent reader.
|
|
readAllDataConcThreshold = uint64(4)
|
|
)
|
|
|
|
// byteReader provides structured reading on a byte stream of external storage.
|
|
// It can also switch to concurrent reading mode and fetch a larger amount of
|
|
// data to improve throughput.
|
|
type byteReader struct {
|
|
ctx context.Context
|
|
storageReader objectio.Reader
|
|
|
|
// curBuf is either smallBuf or concurrentReader.largeBuf.
|
|
curBuf [][]byte
|
|
curBufIdx int // invariant: 0 <= curBufIdx < len(curBuf) when curBuf contains unread data
|
|
curBufOffset int // invariant: 0 <= curBufOffset < len(curBuf[curBufIdx]) if curBufIdx < len(curBuf)
|
|
smallBuf []byte
|
|
|
|
concurrentReader struct {
|
|
largeBufferPool *membuf.Buffer
|
|
store storeapi.Storage
|
|
filename string
|
|
concurrency int
|
|
bufSizePerConc int
|
|
|
|
now bool
|
|
expected bool
|
|
largeBuf [][]byte
|
|
reader *concurrentFileReader
|
|
reloadCnt int
|
|
}
|
|
|
|
logger *zap.Logger
|
|
mergeSortReadCounter prometheus.Counter
|
|
}
|
|
|
|
func openStoreReaderAndSeek(
|
|
ctx context.Context,
|
|
store storeapi.Storage,
|
|
name string,
|
|
initFileOffset uint64,
|
|
prefetchSize int,
|
|
) (objectio.Reader, error) {
|
|
storageReader, err := store.Open(ctx, name, &storeapi.ReaderOption{
|
|
StartOffset: aws.Int64(int64(initFileOffset)),
|
|
PrefetchSize: prefetchSize,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return storageReader, nil
|
|
}
|
|
|
|
// newByteReader wraps readNBytes functionality to storageReader. If store and
|
|
// filename are also given, this reader can use switchConcurrentMode to switch to
|
|
// concurrent reading mode.
|
|
func newByteReader(
|
|
ctx context.Context,
|
|
storageReader objectio.Reader,
|
|
bufSize int,
|
|
) (r *byteReader, err error) {
|
|
defer func() {
|
|
if err != nil && r != nil {
|
|
_ = r.Close()
|
|
}
|
|
}()
|
|
r = &byteReader{
|
|
ctx: ctx,
|
|
storageReader: storageReader,
|
|
smallBuf: make([]byte, bufSize),
|
|
curBufOffset: 0,
|
|
}
|
|
r.curBuf = [][]byte{r.smallBuf}
|
|
r.logger = logutil.Logger(r.ctx)
|
|
// When the storage reader is open, a GET request has been made.
|
|
return r, r.reload()
|
|
}
|
|
|
|
func (r *byteReader) enableConcurrentRead(
|
|
store storeapi.Storage,
|
|
filename string,
|
|
concurrency int,
|
|
bufSizePerConc int,
|
|
bufferPool *membuf.Buffer,
|
|
) {
|
|
r.concurrentReader.store = store
|
|
r.concurrentReader.filename = filename
|
|
r.concurrentReader.concurrency = concurrency
|
|
r.concurrentReader.bufSizePerConc = bufSizePerConc
|
|
r.concurrentReader.largeBufferPool = bufferPool
|
|
}
|
|
|
|
// switchConcurrentMode is used to help implement sortedReader.switchConcurrentMode.
|
|
// See the comment of the interface.
|
|
func (r *byteReader) switchConcurrentMode(useConcurrent bool) error {
|
|
readerFields := &r.concurrentReader
|
|
if readerFields.store == nil {
|
|
r.logger.Warn("concurrent reader is not enabled, skip switching")
|
|
// caller don't need to care about it.
|
|
return nil
|
|
}
|
|
// need to set it before reload()
|
|
readerFields.expected = useConcurrent
|
|
// concurrent reader will be lazily initialized when reload()
|
|
if useConcurrent {
|
|
return nil
|
|
}
|
|
|
|
// no change
|
|
if !readerFields.now {
|
|
return nil
|
|
}
|
|
|
|
// rest cases is caller want to turn off concurrent reader. We should turn off
|
|
// immediately to release memory.
|
|
reloadCnt, offsetInOldBuf := r.closeConcurrentReader()
|
|
// here we can assume largeBuf is always fully loaded, because the only exception
|
|
// is it's the end of file. When it's the end of the file, caller will see EOF
|
|
// and no further switchConcurrentMode should be called.
|
|
largeBufSize := readerFields.bufSizePerConc * readerFields.concurrency
|
|
delta := int64(offsetInOldBuf + (reloadCnt-1)*largeBufSize)
|
|
|
|
if _, err := r.storageReader.Seek(delta, io.SeekCurrent); err != nil {
|
|
return err
|
|
}
|
|
err := r.reload()
|
|
if goerrors.Is(err, io.EOF) {
|
|
// ignore EOF error, let readNBytes handle it
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (r *byteReader) switchToConcurrentReader() error {
|
|
// because it will be called only when buffered data of storageReader is used
|
|
// up, we can use seek(0, io.SeekCurrent) to get the offset for concurrent
|
|
// reader
|
|
currOffset, err := r.storageReader.Seek(0, io.SeekCurrent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fileSize, err := r.storageReader.GetFileSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
readerFields := &r.concurrentReader
|
|
readerFields.reader, err = newConcurrentFileReader(
|
|
r.ctx,
|
|
readerFields.store,
|
|
readerFields.filename,
|
|
currOffset,
|
|
fileSize,
|
|
readerFields.concurrency,
|
|
readerFields.bufSizePerConc,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
readerFields.largeBuf = make([][]byte, readerFields.concurrency)
|
|
for i := range readerFields.largeBuf {
|
|
readerFields.largeBuf[i] = readerFields.largeBufferPool.AllocBytes(readerFields.bufSizePerConc)
|
|
if readerFields.largeBuf[i] == nil {
|
|
return errors.Errorf("alloc large buffer failed, size %d", readerFields.bufSizePerConc)
|
|
}
|
|
}
|
|
|
|
r.curBuf = readerFields.largeBuf
|
|
r.curBufOffset = 0
|
|
readerFields.now = true
|
|
return nil
|
|
}
|
|
|
|
// readNBytes reads the next n bytes from the reader and returns a buffer slice
|
|
// containing those bytes. The content of returned slice may be changed after
|
|
// next call.
|
|
func (r *byteReader) readNBytes(n int) ([]byte, error) {
|
|
if n <= 0 {
|
|
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)
|
|
}
|
|
if n > int(size.GB) {
|
|
return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB)
|
|
}
|
|
|
|
readLen, bs := r.next(n)
|
|
if readLen == n && len(bs) == 1 {
|
|
return bs[0], nil
|
|
}
|
|
// need to flatten bs
|
|
auxBuf := make([]byte, n)
|
|
for _, b := range bs {
|
|
copy(auxBuf[len(auxBuf)-n:], b)
|
|
n -= len(b)
|
|
}
|
|
hasRead := readLen > 0
|
|
for n > 0 {
|
|
err := r.reload()
|
|
if err != nil {
|
|
if goerrors.Is(err, io.EOF) && hasRead {
|
|
// EOF is only allowed when we have not read any data
|
|
return nil, errors.Annotatef(io.ErrUnexpectedEOF, "file: %s", r.concurrentReader.filename)
|
|
}
|
|
return nil, errors.Trace(err)
|
|
}
|
|
readLen, bs = r.next(n)
|
|
hasRead = hasRead || readLen > 0
|
|
for _, b := range bs {
|
|
copy(auxBuf[len(auxBuf)-n:], b)
|
|
n -= len(b)
|
|
}
|
|
}
|
|
return auxBuf, nil
|
|
}
|
|
|
|
func (r *byteReader) next(n int) (int, [][]byte) {
|
|
retCnt := 0
|
|
// TODO(lance6716): heap escape performance?
|
|
ret := make([][]byte, 0, len(r.curBuf)-r.curBufIdx+1)
|
|
for r.curBufIdx < len(r.curBuf) && n > 0 {
|
|
cur := r.curBuf[r.curBufIdx]
|
|
if r.curBufOffset+n <= len(cur) {
|
|
ret = append(ret, cur[r.curBufOffset:r.curBufOffset+n])
|
|
retCnt += n
|
|
r.curBufOffset += n
|
|
if r.curBufOffset == len(cur) {
|
|
r.curBufIdx++
|
|
r.curBufOffset = 0
|
|
}
|
|
break
|
|
}
|
|
ret = append(ret, cur[r.curBufOffset:])
|
|
retCnt += len(cur) - r.curBufOffset
|
|
n -= len(cur) - r.curBufOffset
|
|
r.curBufIdx++
|
|
r.curBufOffset = 0
|
|
}
|
|
|
|
return retCnt, ret
|
|
}
|
|
|
|
func (r *byteReader) reload() error {
|
|
if r.mergeSortReadCounter != nil {
|
|
defer func() {
|
|
sz := 0
|
|
for _, b := range r.curBuf {
|
|
sz += len(b)
|
|
}
|
|
r.mergeSortReadCounter.Add(float64(sz))
|
|
}()
|
|
}
|
|
to := r.concurrentReader.expected
|
|
now := r.concurrentReader.now
|
|
// in read only false -> true is possible
|
|
if !now && to {
|
|
r.logger.Info("switch reader mode", zap.Bool("use concurrent mode", true))
|
|
err := r.switchToConcurrentReader()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if r.concurrentReader.now {
|
|
r.concurrentReader.reloadCnt++
|
|
buffers, err := r.concurrentReader.reader.read(r.concurrentReader.largeBuf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.curBuf = buffers
|
|
r.curBufIdx = 0
|
|
r.curBufOffset = 0
|
|
return nil
|
|
}
|
|
|
|
return util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, r.readFromStorageReader)
|
|
}
|
|
|
|
func (r *byteReader) readFromStorageReader() (retryable bool, err error) {
|
|
// when not using concurrentReader, len(curBuf) == 1
|
|
n, err := io.ReadFull(r.storageReader, r.curBuf[0])
|
|
if err != nil {
|
|
switch {
|
|
case goerrors.Is(err, io.EOF):
|
|
// move curBufIdx so following read will also find EOF
|
|
r.curBufIdx = len(r.curBuf)
|
|
return false, err
|
|
case goerrors.Is(err, io.ErrUnexpectedEOF):
|
|
if n == 0 {
|
|
r.logger.Warn("encounter (0, ErrUnexpectedEOF) during during read, retry it")
|
|
return true, err
|
|
}
|
|
// The last batch.
|
|
r.curBuf[0] = r.curBuf[0][:n]
|
|
case goerrors.Is(err, context.Canceled):
|
|
return false, err
|
|
default:
|
|
r.logger.Warn("other error during read", zap.Error(err))
|
|
return false, err
|
|
}
|
|
}
|
|
r.curBufIdx = 0
|
|
r.curBufOffset = 0
|
|
return false, nil
|
|
}
|
|
|
|
func (r *byteReader) closeConcurrentReader() (reloadCnt, offsetInOldBuffer int) {
|
|
r.logger.Info("drop data in closeConcurrentReader",
|
|
zap.Int("reloadCnt", r.concurrentReader.reloadCnt),
|
|
zap.Int("dropBytes", r.concurrentReader.bufSizePerConc*(len(r.curBuf)-r.curBufIdx)-r.curBufOffset),
|
|
zap.Int("curBufIdx", r.curBufIdx),
|
|
)
|
|
failpoint.Inject("assertReloadAtMostOnce", func() {
|
|
if r.concurrentReader.reloadCnt > 1 {
|
|
panic(fmt.Sprintf("reloadCnt is %d", r.concurrentReader.reloadCnt))
|
|
}
|
|
})
|
|
r.concurrentReader.largeBufferPool.Destroy()
|
|
r.concurrentReader.largeBuf = nil
|
|
r.concurrentReader.now = false
|
|
reloadCnt = r.concurrentReader.reloadCnt
|
|
r.concurrentReader.reloadCnt = 0
|
|
r.curBuf = [][]byte{r.smallBuf}
|
|
offsetInOldBuffer = r.curBufOffset + r.curBufIdx*r.concurrentReader.bufSizePerConc
|
|
r.curBufOffset = 0
|
|
return
|
|
}
|
|
|
|
func (r *byteReader) Close() error {
|
|
if r.concurrentReader.now {
|
|
r.closeConcurrentReader()
|
|
}
|
|
return r.storageReader.Close()
|
|
}
|