From a06bcc60e1c10ebf00a63319a8bfbc8d160416b2 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 29 Aug 2023 14:18:38 +0800 Subject: [PATCH] br/lightning: add DecompressConfig to external storage (#46430) ref pingcap/tidb#42930 --- br/pkg/lightning/importer/chunk_process.go | 2 +- br/pkg/lightning/importer/get_pre_info.go | 4 +-- br/pkg/lightning/mydump/loader.go | 2 +- br/pkg/lightning/mydump/parser.go | 3 +- br/pkg/lightning/mydump/reader.go | 2 +- br/pkg/storage/BUILD.bazel | 3 +- br/pkg/storage/compress.go | 24 ++++++++++------ br/pkg/storage/compress_test.go | 4 +-- br/pkg/storage/writer.go | 15 ++++++++-- br/pkg/storage/writer_test.go | 32 ++++++++++++++++++++-- dumpling/export/writer_util.go | 4 +-- executor/importer/chunk_process.go | 8 ++++-- executor/importer/import.go | 2 +- executor/importer/table_import.go | 2 +- executor/load_data.go | 2 +- 15 files changed, 79 insertions(+), 30 deletions(-) diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index 650500eb49..41aacb2198 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -84,7 +84,7 @@ func openParser( tblInfo *model.TableInfo, ) (mydump.Parser, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) - reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store) + reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{}) if err != nil { return nil, err } diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index 191fed628b..6bf00dbd56 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -468,7 +468,7 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context, // ReadFirstNRowsByFileMeta reads the first N rows of an data file. // It implements the PreImportInfoGetter interface. func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) { - reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage) + reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, storage.DecompressConfig{}) if err != nil { return nil, nil, errors.Trace(err) } @@ -617,7 +617,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable( return resultIndexRatio, isRowOrdered, nil } sampleFile := tableMeta.DataFiles[0].FileMeta - reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage) + reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, storage.DecompressConfig{}) if err != nil { return 0.0, false, errors.Trace(err) } diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index f57d2d0db7..eb6172e0aa 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -691,7 +691,7 @@ func calculateFileBytes(ctx context.Context, } defer reader.Close() - compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, offset) + compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, storage.DecompressConfig{}, offset) if err != nil { return 0, 0, errors.Trace(err) } diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index ca3ddb27e4..d363818694 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -650,6 +650,7 @@ func OpenReader( ctx context.Context, fileMeta *SourceFileMeta, store storage.ExternalStorage, + decompressCfg storage.DecompressConfig, ) (reader storage.ReadSeekCloser, err error) { switch { case fileMeta.Type == SourceTypeParquet: @@ -659,7 +660,7 @@ func OpenReader( if err2 != nil { return nil, err2 } - reader, err = storage.WithCompression(store, compressType).Open(ctx, fileMeta.Path) + reader, err = storage.WithCompression(store, compressType, decompressCfg).Open(ctx, fileMeta.Path) default: reader, err = store.Open(ctx, fileMeta.Path) } diff --git a/br/pkg/lightning/mydump/reader.go b/br/pkg/lightning/mydump/reader.go index ecf5390140..aab1c37f62 100644 --- a/br/pkg/lightning/mydump/reader.go +++ b/br/pkg/lightning/mydump/reader.go @@ -91,7 +91,7 @@ func ExportStatement(ctx context.Context, store storage.ExternalStorage, if err != nil { return nil, errors.Trace(err) } - store = storage.WithCompression(store, compressType) + store = storage.WithCompression(store, compressType, storage.DecompressConfig{}) } fd, err := store.Open(ctx, sqlFile.FileMeta.Path) if err != nil { diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 719b0dcf35..46aa4efb32 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -77,7 +77,7 @@ go_test( ], embed = [":storage"], flaky = True, - shard_count = 47, + shard_count = 48, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", @@ -87,6 +87,7 @@ go_test( "@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob", "@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//bloberror", "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_klauspost_compress//zstd", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 544938cc4f..0afb5976b7 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -13,15 +13,20 @@ import ( type withCompression struct { ExternalStorage - compressType CompressType + compressType CompressType + decompressCfg DecompressConfig } // WithCompression returns an ExternalStorage with compress option -func WithCompression(inner ExternalStorage, compressionType CompressType) ExternalStorage { +func WithCompression(inner ExternalStorage, compressionType CompressType, cfg DecompressConfig) ExternalStorage { if compressionType == NoCompression { return inner } - return &withCompression{ExternalStorage: inner, compressType: compressionType} + return &withCompression{ + ExternalStorage: inner, + compressType: compressionType, + decompressCfg: cfg, + } } func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { @@ -46,7 +51,7 @@ func (w *withCompression) Open(ctx context.Context, path string) (ExternalFileRe if err != nil { return nil, errors.Trace(err) } - uncompressReader, err := InterceptDecompressReader(fileReader, w.compressType) + uncompressReader, err := InterceptDecompressReader(fileReader, w.compressType, w.decompressCfg) if err != nil { return nil, errors.Trace(err) } @@ -73,7 +78,7 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er return data, errors.Trace(err) } bf := bytes.NewBuffer(data) - compressBf, err := newCompressReader(w.compressType, bf) + compressBf, err := newCompressReader(w.compressType, w.decompressCfg, bf) if err != nil { return nil, err } @@ -91,11 +96,12 @@ type compressReader struct { // reader on the given io.ReadSeekCloser. Note that the returned // io.ReadSeekCloser does not have the property that Seek(0, io.SeekCurrent) // equals total bytes Read() if the decompress reader is used. -func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType CompressType) (io.ReadSeekCloser, error) { +func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType CompressType, + cfg DecompressConfig) (io.ReadSeekCloser, error) { if compressType == NoCompression { return fileReader, nil } - r, err := newCompressReader(compressType, fileReader) + r, err := newCompressReader(compressType, cfg, fileReader) if err != nil { return nil, errors.Trace(err) } @@ -106,7 +112,7 @@ func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType Compre }, nil } -func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType CompressType, n int64) (ExternalFileReader, error) { +func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType CompressType, cfg DecompressConfig, n int64) (ExternalFileReader, error) { newFileReader := fileReader if n < 0 { return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support negative limit, n: %d", n) @@ -117,7 +123,7 @@ func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType Compr Closer: fileReader, } } - return InterceptDecompressReader(newFileReader, compressType) + return InterceptDecompressReader(newFileReader, compressType, cfg) } func (c *compressReader) Seek(offset int64, whence int) (int64, error) { diff --git a/br/pkg/storage/compress_test.go b/br/pkg/storage/compress_test.go index 4f7d315eba..000aa1a937 100644 --- a/br/pkg/storage/compress_test.go +++ b/br/pkg/storage/compress_test.go @@ -20,7 +20,7 @@ func TestWithCompressReadWriteFile(t *testing.T) { ctx := context.Background() storage, err := Create(ctx, backend, true) require.NoError(t, err) - storage = WithCompression(storage, Gzip) + storage = WithCompression(storage, Gzip, DecompressConfig{}) name := "with compress test" content := "hello,world!" fileName := strings.ReplaceAll(name, " ", "-") + ".txt.gz" @@ -30,7 +30,7 @@ func TestWithCompressReadWriteFile(t *testing.T) { // make sure compressed file is written correctly file, err := os.Open(filepath.Join(dir, fileName)) require.NoError(t, err) - uncompressedFile, err := newCompressReader(Gzip, file) + uncompressedFile, err := newCompressReader(Gzip, DecompressConfig{}, file) require.NoError(t, err) newContent, err := io.ReadAll(uncompressedFile) require.NoError(t, err) diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index 003f91451f..e16e1974c6 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -27,6 +27,13 @@ const ( Zstd ) +// DecompressConfig is the config used for decompression. +type DecompressConfig struct { + // ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency. + // if not 1, ZStd will decode file asynchronously. + ZStdDecodeConcurrency int +} + type flusher interface { Flush() error } @@ -86,14 +93,18 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri } } -func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) { +func newCompressReader(compressType CompressType, cfg DecompressConfig, r io.Reader) (io.Reader, error) { switch compressType { case Gzip: return gzip.NewReader(r) case Snappy: return snappy.NewReader(r), nil case Zstd: - return zstd.NewReader(r) + options := []zstd.DOption{} + if cfg.ZStdDecodeConcurrency > 0 { + options = append(options, zstd.WithDecoderConcurrency(cfg.ZStdDecodeConcurrency)) + } + return zstd.NewReader(r, options...) default: return nil, nil } diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index 4f41aeb97a..16cd0ed3c8 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -11,6 +11,7 @@ import ( "strings" "testing" + "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" ) @@ -102,7 +103,7 @@ func TestCompressReaderWriter(t *testing.T) { ctx := context.Background() storage, err := Create(ctx, backend, true) require.NoError(t, err) - storage = WithCompression(storage, test.compressType) + storage = WithCompression(storage, test.compressType, DecompressConfig{}) suffix := createSuffixString(test.compressType) fileName := strings.ReplaceAll(test.name, " ", "-") + suffix writer, err := storage.Create(ctx, fileName, nil) @@ -119,7 +120,7 @@ func TestCompressReaderWriter(t *testing.T) { // make sure compressed file is written correctly file, err := os.Open(filepath.Join(dir, fileName)) require.NoError(t, err) - r, err := newCompressReader(test.compressType, file) + r, err := newCompressReader(test.compressType, DecompressConfig{}, file) require.NoError(t, err) var bf bytes.Buffer _, err = bf.ReadFrom(r) @@ -168,3 +169,30 @@ func TestCompressReaderWriter(t *testing.T) { } } } + +func TestNewCompressReader(t *testing.T) { + var buf bytes.Buffer + var w io.WriteCloser + var err error + w, err = zstd.NewWriter(&buf) + require.NoError(t, err) + _, err = w.Write([]byte("data")) + require.NoError(t, err) + require.NoError(t, w.Close()) + compressedData := buf.Bytes() + + // default cfg + r, err := newCompressReader(Zstd, DecompressConfig{}, bytes.NewReader(compressedData)) + require.NoError(t, err) + allData, err := io.ReadAll(r) + require.NoError(t, err) + require.Equal(t, "data", string(allData)) + + // sync decode + config := DecompressConfig{ZStdDecodeConcurrency: 1} + r, err = newCompressReader(Zstd, config, bytes.NewReader(compressedData)) + require.NoError(t, err) + allData, err = io.ReadAll(r) + require.NoError(t, err) + require.Equal(t, "data", string(allData)) +} diff --git a/dumpling/export/writer_util.go b/dumpling/export/writer_util.go index 2a4dbbc1cf..dc34992d9a 100644 --- a/dumpling/export/writer_util.go +++ b/dumpling/export/writer_util.go @@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) { fileName += compressFileSuffix(compressType) fullPath := s.URI() + "/" + fileName - writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil) + writer, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(tctx, fileName, nil) if err != nil { tctx.L().Warn("fail to open file", zap.String("path", fullPath), @@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, initRoutine := func() error { // use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close, // which will cause a context canceled error when closing gcs's Writer - w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil) + w, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(pCtx, fileName, nil) if err != nil { pCtx.L().Warn("fail to open file", zap.String("path", fullPath), diff --git a/executor/importer/chunk_process.go b/executor/importer/chunk_process.go index 1fb7c74c5d..8765d74433 100644 --- a/executor/importer/chunk_process.go +++ b/executor/importer/chunk_process.go @@ -111,9 +111,11 @@ type chunkProcessor struct { dataWriter backend.EngineWriter indexWriter backend.EngineWriter - encoder kvEncoder - kvCodec tikv.Codec - progress *asyncloaddata.Progress + encoder kvEncoder + kvCodec tikv.Codec + progress *asyncloaddata.Progress + // startOffset is the offset of the first interested row in this chunk. + // some rows before startOffset might be skipped if skip_rows > 0. startOffset int64 // total duration takes by read/encode/deliver. diff --git a/executor/importer/import.go b/executor/importer/import.go index cfa7f51a73..4d0ba9f43e 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -990,7 +990,7 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo { f := e.dataFiles[i] result = append(result, LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore) + fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore, storage.DecompressConfig{}) if err2 != nil { return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "Please check the INFILE path is correct") } diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 1bfb4ac9c3..d486a7482f 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -255,7 +255,7 @@ type TableImporter struct { func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { info := LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore) + reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore, storage.DecompressConfig{}) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/load_data.go b/executor/load_data.go index 2cc00260f9..73caca67e6 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -153,7 +153,7 @@ func (e *LoadDataWorker) LoadLocal(ctx context.Context, r io.ReadCloser) error { readers := []importer.LoadDataReaderInfo{{ Opener: func(_ context.Context) (io.ReadSeekCloser, error) { addedSeekReader := NewSimpleSeekerOnReadCloser(r) - return storage.InterceptDecompressReader(addedSeekReader, compressTp2) + return storage.InterceptDecompressReader(addedSeekReader, compressTp2, storage.DecompressConfig{}) }}} return e.load(ctx, readers) }