br/lightning: add DecompressConfig to external storage (#46430)
ref pingcap/tidb#42930
This commit is contained in:
@ -84,7 +84,7 @@ func openParser(
|
||||
tblInfo *model.TableInfo,
|
||||
) (mydump.Parser, error) {
|
||||
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)
|
||||
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store)
|
||||
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -468,7 +468,7 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context,
|
||||
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
|
||||
// It implements the PreImportInfoGetter interface.
|
||||
func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) {
|
||||
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage)
|
||||
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, storage.DecompressConfig{})
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
@ -617,7 +617,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
|
||||
return resultIndexRatio, isRowOrdered, nil
|
||||
}
|
||||
sampleFile := tableMeta.DataFiles[0].FileMeta
|
||||
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage)
|
||||
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, storage.DecompressConfig{})
|
||||
if err != nil {
|
||||
return 0.0, false, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -691,7 +691,7 @@ func calculateFileBytes(ctx context.Context,
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, offset)
|
||||
compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, storage.DecompressConfig{}, offset)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -650,6 +650,7 @@ func OpenReader(
|
||||
ctx context.Context,
|
||||
fileMeta *SourceFileMeta,
|
||||
store storage.ExternalStorage,
|
||||
decompressCfg storage.DecompressConfig,
|
||||
) (reader storage.ReadSeekCloser, err error) {
|
||||
switch {
|
||||
case fileMeta.Type == SourceTypeParquet:
|
||||
@ -659,7 +660,7 @@ func OpenReader(
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
}
|
||||
reader, err = storage.WithCompression(store, compressType).Open(ctx, fileMeta.Path)
|
||||
reader, err = storage.WithCompression(store, compressType, decompressCfg).Open(ctx, fileMeta.Path)
|
||||
default:
|
||||
reader, err = store.Open(ctx, fileMeta.Path)
|
||||
}
|
||||
|
||||
@ -91,7 +91,7 @@ func ExportStatement(ctx context.Context, store storage.ExternalStorage,
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
store = storage.WithCompression(store, compressType)
|
||||
store = storage.WithCompression(store, compressType, storage.DecompressConfig{})
|
||||
}
|
||||
fd, err := store.Open(ctx, sqlFile.FileMeta.Path)
|
||||
if err != nil {
|
||||
|
||||
@ -77,7 +77,7 @@ go_test(
|
||||
],
|
||||
embed = [":storage"],
|
||||
flaky = True,
|
||||
shard_count = 47,
|
||||
shard_count = 48,
|
||||
deps = [
|
||||
"//br/pkg/mock",
|
||||
"@com_github_aws_aws_sdk_go//aws",
|
||||
@ -87,6 +87,7 @@ go_test(
|
||||
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob",
|
||||
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//bloberror",
|
||||
"@com_github_fsouza_fake_gcs_server//fakestorage",
|
||||
"@com_github_klauspost_compress//zstd",
|
||||
"@com_github_pingcap_errors//:errors",
|
||||
"@com_github_pingcap_failpoint//:failpoint",
|
||||
"@com_github_pingcap_kvproto//pkg/brpb",
|
||||
|
||||
@ -13,15 +13,20 @@ import (
|
||||
|
||||
type withCompression struct {
|
||||
ExternalStorage
|
||||
compressType CompressType
|
||||
compressType CompressType
|
||||
decompressCfg DecompressConfig
|
||||
}
|
||||
|
||||
// WithCompression returns an ExternalStorage with compress option
|
||||
func WithCompression(inner ExternalStorage, compressionType CompressType) ExternalStorage {
|
||||
func WithCompression(inner ExternalStorage, compressionType CompressType, cfg DecompressConfig) ExternalStorage {
|
||||
if compressionType == NoCompression {
|
||||
return inner
|
||||
}
|
||||
return &withCompression{ExternalStorage: inner, compressType: compressionType}
|
||||
return &withCompression{
|
||||
ExternalStorage: inner,
|
||||
compressType: compressionType,
|
||||
decompressCfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
|
||||
@ -46,7 +51,7 @@ func (w *withCompression) Open(ctx context.Context, path string) (ExternalFileRe
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
uncompressReader, err := InterceptDecompressReader(fileReader, w.compressType)
|
||||
uncompressReader, err := InterceptDecompressReader(fileReader, w.compressType, w.decompressCfg)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -73,7 +78,7 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
|
||||
return data, errors.Trace(err)
|
||||
}
|
||||
bf := bytes.NewBuffer(data)
|
||||
compressBf, err := newCompressReader(w.compressType, bf)
|
||||
compressBf, err := newCompressReader(w.compressType, w.decompressCfg, bf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -91,11 +96,12 @@ type compressReader struct {
|
||||
// reader on the given io.ReadSeekCloser. Note that the returned
|
||||
// io.ReadSeekCloser does not have the property that Seek(0, io.SeekCurrent)
|
||||
// equals total bytes Read() if the decompress reader is used.
|
||||
func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType CompressType) (io.ReadSeekCloser, error) {
|
||||
func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType CompressType,
|
||||
cfg DecompressConfig) (io.ReadSeekCloser, error) {
|
||||
if compressType == NoCompression {
|
||||
return fileReader, nil
|
||||
}
|
||||
r, err := newCompressReader(compressType, fileReader)
|
||||
r, err := newCompressReader(compressType, cfg, fileReader)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -106,7 +112,7 @@ func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType Compre
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType CompressType, n int64) (ExternalFileReader, error) {
|
||||
func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType CompressType, cfg DecompressConfig, n int64) (ExternalFileReader, error) {
|
||||
newFileReader := fileReader
|
||||
if n < 0 {
|
||||
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support negative limit, n: %d", n)
|
||||
@ -117,7 +123,7 @@ func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType Compr
|
||||
Closer: fileReader,
|
||||
}
|
||||
}
|
||||
return InterceptDecompressReader(newFileReader, compressType)
|
||||
return InterceptDecompressReader(newFileReader, compressType, cfg)
|
||||
}
|
||||
|
||||
func (c *compressReader) Seek(offset int64, whence int) (int64, error) {
|
||||
|
||||
@ -20,7 +20,7 @@ func TestWithCompressReadWriteFile(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
storage, err := Create(ctx, backend, true)
|
||||
require.NoError(t, err)
|
||||
storage = WithCompression(storage, Gzip)
|
||||
storage = WithCompression(storage, Gzip, DecompressConfig{})
|
||||
name := "with compress test"
|
||||
content := "hello,world!"
|
||||
fileName := strings.ReplaceAll(name, " ", "-") + ".txt.gz"
|
||||
@ -30,7 +30,7 @@ func TestWithCompressReadWriteFile(t *testing.T) {
|
||||
// make sure compressed file is written correctly
|
||||
file, err := os.Open(filepath.Join(dir, fileName))
|
||||
require.NoError(t, err)
|
||||
uncompressedFile, err := newCompressReader(Gzip, file)
|
||||
uncompressedFile, err := newCompressReader(Gzip, DecompressConfig{}, file)
|
||||
require.NoError(t, err)
|
||||
newContent, err := io.ReadAll(uncompressedFile)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -27,6 +27,13 @@ const (
|
||||
Zstd
|
||||
)
|
||||
|
||||
// DecompressConfig is the config used for decompression.
|
||||
type DecompressConfig struct {
|
||||
// ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency.
|
||||
// if not 1, ZStd will decode file asynchronously.
|
||||
ZStdDecodeConcurrency int
|
||||
}
|
||||
|
||||
type flusher interface {
|
||||
Flush() error
|
||||
}
|
||||
@ -86,14 +93,18 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri
|
||||
}
|
||||
}
|
||||
|
||||
func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) {
|
||||
func newCompressReader(compressType CompressType, cfg DecompressConfig, r io.Reader) (io.Reader, error) {
|
||||
switch compressType {
|
||||
case Gzip:
|
||||
return gzip.NewReader(r)
|
||||
case Snappy:
|
||||
return snappy.NewReader(r), nil
|
||||
case Zstd:
|
||||
return zstd.NewReader(r)
|
||||
options := []zstd.DOption{}
|
||||
if cfg.ZStdDecodeConcurrency > 0 {
|
||||
options = append(options, zstd.WithDecoderConcurrency(cfg.ZStdDecodeConcurrency))
|
||||
}
|
||||
return zstd.NewReader(r, options...)
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -102,7 +103,7 @@ func TestCompressReaderWriter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
storage, err := Create(ctx, backend, true)
|
||||
require.NoError(t, err)
|
||||
storage = WithCompression(storage, test.compressType)
|
||||
storage = WithCompression(storage, test.compressType, DecompressConfig{})
|
||||
suffix := createSuffixString(test.compressType)
|
||||
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
|
||||
writer, err := storage.Create(ctx, fileName, nil)
|
||||
@ -119,7 +120,7 @@ func TestCompressReaderWriter(t *testing.T) {
|
||||
// make sure compressed file is written correctly
|
||||
file, err := os.Open(filepath.Join(dir, fileName))
|
||||
require.NoError(t, err)
|
||||
r, err := newCompressReader(test.compressType, file)
|
||||
r, err := newCompressReader(test.compressType, DecompressConfig{}, file)
|
||||
require.NoError(t, err)
|
||||
var bf bytes.Buffer
|
||||
_, err = bf.ReadFrom(r)
|
||||
@ -168,3 +169,30 @@ func TestCompressReaderWriter(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewCompressReader(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
var w io.WriteCloser
|
||||
var err error
|
||||
w, err = zstd.NewWriter(&buf)
|
||||
require.NoError(t, err)
|
||||
_, err = w.Write([]byte("data"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Close())
|
||||
compressedData := buf.Bytes()
|
||||
|
||||
// default cfg
|
||||
r, err := newCompressReader(Zstd, DecompressConfig{}, bytes.NewReader(compressedData))
|
||||
require.NoError(t, err)
|
||||
allData, err := io.ReadAll(r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "data", string(allData))
|
||||
|
||||
// sync decode
|
||||
config := DecompressConfig{ZStdDecodeConcurrency: 1}
|
||||
r, err = newCompressReader(Zstd, config, bytes.NewReader(compressedData))
|
||||
require.NoError(t, err)
|
||||
allData, err = io.ReadAll(r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "data", string(allData))
|
||||
}
|
||||
|
||||
@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
|
||||
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
|
||||
fileName += compressFileSuffix(compressType)
|
||||
fullPath := s.URI() + "/" + fileName
|
||||
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil)
|
||||
writer, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(tctx, fileName, nil)
|
||||
if err != nil {
|
||||
tctx.L().Warn("fail to open file",
|
||||
zap.String("path", fullPath),
|
||||
@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
|
||||
initRoutine := func() error {
|
||||
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
|
||||
// which will cause a context canceled error when closing gcs's Writer
|
||||
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil)
|
||||
w, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(pCtx, fileName, nil)
|
||||
if err != nil {
|
||||
pCtx.L().Warn("fail to open file",
|
||||
zap.String("path", fullPath),
|
||||
|
||||
@ -111,9 +111,11 @@ type chunkProcessor struct {
|
||||
dataWriter backend.EngineWriter
|
||||
indexWriter backend.EngineWriter
|
||||
|
||||
encoder kvEncoder
|
||||
kvCodec tikv.Codec
|
||||
progress *asyncloaddata.Progress
|
||||
encoder kvEncoder
|
||||
kvCodec tikv.Codec
|
||||
progress *asyncloaddata.Progress
|
||||
// startOffset is the offset of the first interested row in this chunk.
|
||||
// some rows before startOffset might be skipped if skip_rows > 0.
|
||||
startOffset int64
|
||||
|
||||
// total duration takes by read/encode/deliver.
|
||||
|
||||
@ -990,7 +990,7 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo {
|
||||
f := e.dataFiles[i]
|
||||
result = append(result, LoadDataReaderInfo{
|
||||
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
|
||||
fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore)
|
||||
fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore, storage.DecompressConfig{})
|
||||
if err2 != nil {
|
||||
return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "Please check the INFILE path is correct")
|
||||
}
|
||||
|
||||
@ -255,7 +255,7 @@ type TableImporter struct {
|
||||
func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
|
||||
info := LoadDataReaderInfo{
|
||||
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
|
||||
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore)
|
||||
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore, storage.DecompressConfig{})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -153,7 +153,7 @@ func (e *LoadDataWorker) LoadLocal(ctx context.Context, r io.ReadCloser) error {
|
||||
readers := []importer.LoadDataReaderInfo{{
|
||||
Opener: func(_ context.Context) (io.ReadSeekCloser, error) {
|
||||
addedSeekReader := NewSimpleSeekerOnReadCloser(r)
|
||||
return storage.InterceptDecompressReader(addedSeekReader, compressTp2)
|
||||
return storage.InterceptDecompressReader(addedSeekReader, compressTp2, storage.DecompressConfig{})
|
||||
}}}
|
||||
return e.load(ctx, readers)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user