diff --git a/dumpling/export/BUILD.bazel b/dumpling/export/BUILD.bazel index 5dc1657633..6949e916d1 100644 --- a/dumpling/export/BUILD.bazel +++ b/dumpling/export/BUILD.bazel @@ -37,6 +37,8 @@ go_library( "//pkg/infoschema/context", "//pkg/meta/model", "//pkg/objstore", + "//pkg/objstore/compressedio", + "//pkg/objstore/objectio", "//pkg/parser", "//pkg/parser/ast", "//pkg/parser/format", diff --git a/dumpling/export/config.go b/dumpling/export/config.go index 1756373310..429ba7f74c 100644 --- a/dumpling/export/config.go +++ b/dumpling/export/config.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/promutil" filter "github.com/pingcap/tidb/pkg/util/table-filter" @@ -134,7 +135,7 @@ type Config struct { EscapeBackslash bool DumpEmptyDatabase bool PosAfterConnect bool - CompressType objstore.CompressType + CompressType compressedio.CompressType Host string Port int @@ -586,7 +587,7 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - conf.CompressType, err = ParseCompressType(compressType) + conf.CompressType, err = compressedio.ParseCompressType(compressType) if err != nil { return errors.Trace(err) } @@ -671,22 +672,6 @@ func GetConfTables(tablesList []string) (DatabaseTables, error) { return dbTables, nil } -// ParseCompressType parses compressType string to storage.CompressType -func ParseCompressType(compressType string) (objstore.CompressType, error) { - switch compressType { - case "", "no-compression": - return objstore.NoCompression, nil - case "gzip", "gz": - return objstore.Gzip, nil - case "snappy": - return objstore.Snappy, nil - case "zstd", "zst": - return objstore.Zstd, nil - default: - return objstore.NoCompression, errors.Errorf("unknown compress type %s", compressType) - } -} - // ParseOutputDialect parses output dialect string to Dialect func ParseOutputDialect(outputDialect string) (CSVDialect, error) { switch outputDialect { diff --git a/dumpling/export/metadata.go b/dumpling/export/metadata.go index 95d1bc51c0..72ee5bd8aa 100644 --- a/dumpling/export/metadata.go +++ b/dumpling/export/metadata.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/tidb/br/pkg/version" tcontext "github.com/pingcap/tidb/dumpling/context" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "go.uber.org/zap" ) @@ -227,7 +228,7 @@ func recordGlobalMetaData(tctx *tcontext.Context, db *sql.Conn, buffer *bytes.Bu func (m *globalMetadata) writeGlobalMetaData() error { // keep consistent with mydumper. Never compress metadata - fileWriter, tearDown, err := buildFileWriter(m.tctx, m.storage, metadataPath, objstore.NoCompression) + fileWriter, tearDown, err := buildFileWriter(m.tctx, m.storage, metadataPath, compressedio.NoCompression) if err != nil { return err } diff --git a/dumpling/export/writer_serial_test.go b/dumpling/export/writer_serial_test.go index 43be292809..f04d1deb52 100644 --- a/dumpling/export/writer_serial_test.go +++ b/dumpling/export/writer_serial_test.go @@ -3,6 +3,8 @@ package export import ( + "bytes" + "context" "database/sql/driver" "fmt" "strings" @@ -10,18 +12,53 @@ import ( "github.com/pingcap/errors" tcontext "github.com/pingcap/tidb/dumpling/context" - "github.com/pingcap/tidb/pkg/objstore" "github.com/pingcap/tidb/pkg/util/promutil" "github.com/stretchr/testify/require" ) +// BytesWriter is a Writer implementation on top of bytes.Buffer that is useful for testing. +type BytesWriter struct { + buf *bytes.Buffer +} + +// Write delegates to bytes.Buffer. +func (u *BytesWriter) Write(_ context.Context, p []byte) (int, error) { + return u.buf.Write(p) +} + +// Close delegates to bytes.Buffer. +func (*BytesWriter) Close(_ context.Context) error { + // noop + return nil +} + +// Bytes delegates to bytes.Buffer. +func (u *BytesWriter) Bytes() []byte { + return u.buf.Bytes() +} + +// String delegates to bytes.Buffer. +func (u *BytesWriter) String() string { + return u.buf.String() +} + +// Reset delegates to bytes.Buffer. +func (u *BytesWriter) Reset() { + u.buf.Reset() +} + +// NewBufferWriter creates a Writer that simply writes to a buffer (useful for testing). +func NewBufferWriter() *BytesWriter { + return &BytesWriter{buf: &bytes.Buffer{}} +} + func TestWriteMeta(t *testing.T) { createTableStmt := "CREATE TABLE `t1` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;\n" specCmts := []string{"/*!40103 SET TIME_ZONE='+00:00' */;"} meta := newMockMetaIR("t1", createTableStmt, specCmts) - writer := objstore.NewBufferWriter() + writer := NewBufferWriter() err := WriteMeta(tcontext.Background(), meta, writer) require.NoError(t, err) @@ -48,7 +85,7 @@ func TestWriteInsert(t *testing.T) { "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;", } tableIR := newMockTableIR("test", "employee", data, specCmts, colTypes) - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize) m := newMetrics(conf.PromFactory, conf.Labels) @@ -86,7 +123,7 @@ func TestWriteInsertReturnsError(t *testing.T) { rowErr := errors.New("mock row error") tableIR := newMockTableIR("test", "employee", data, specCmts, colTypes) tableIR.rowErr = rowErr - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize) m := newMetrics(conf.PromFactory, conf.Labels) @@ -117,7 +154,7 @@ func TestWriteInsertInCsv(t *testing.T) { } colTypes := []string{"INT", "SET", "VARCHAR", "VARCHAR", "TEXT"} tableIR := newMockTableIR("test", "employee", data, nil, colTypes) - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() // test nullValue opt := &csvOption{separator: []byte(","), delimiter: []byte{'"'}, nullValue: "\\N", lineTerminator: []byte("\r\n")} @@ -227,7 +264,7 @@ func TestWriteInsertInCsvReturnsError(t *testing.T) { rowErr := errors.New("mock row error") tableIR := newMockTableIR("test", "employee", data, nil, colTypes) tableIR.rowErr = rowErr - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() // test nullValue opt := &csvOption{separator: []byte(","), delimiter: []byte{'"'}, nullValue: "\\N", lineTerminator: []byte("\r\n")} @@ -263,7 +300,7 @@ func TestWriteInsertInCsvWithDialect(t *testing.T) { conf.CsvOutputDialect = CSVDialectDefault tableIR := newMockTableIR("test", "employee", data, nil, colTypes) m := newMetrics(conf.PromFactory, conf.Labels) - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, m) require.NoError(t, err) require.Equal(t, uint64(4), n) @@ -281,7 +318,7 @@ func TestWriteInsertInCsvWithDialect(t *testing.T) { conf.CsvOutputDialect = CSVDialectRedshift tableIR := newMockTableIR("test", "employee", data, nil, colTypes) m := newMetrics(conf.PromFactory, conf.Labels) - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, m) require.NoError(t, err) require.Equal(t, uint64(4), n) @@ -299,7 +336,7 @@ func TestWriteInsertInCsvWithDialect(t *testing.T) { conf.CsvOutputDialect = CSVDialectBigQuery tableIR := newMockTableIR("test", "employee", data, nil, colTypes) m := newMetrics(conf.PromFactory, conf.Labels) - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, m) require.NoError(t, err) require.Equal(t, uint64(4), n) @@ -329,7 +366,7 @@ func TestSQLDataTypes(t *testing.T) { tableData := [][]driver.Value{{origin}} colType := []string{sqlType} tableIR := newMockTableIR("test", "t", tableData, nil, colType) - bf := objstore.NewBufferWriter() + bf := NewBufferWriter() conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize) m := newMetrics(conf.PromFactory, conf.Labels) diff --git a/dumpling/export/writer_util.go b/dumpling/export/writer_util.go index f35631f867..ab758cf911 100644 --- a/dumpling/export/writer_util.go +++ b/dumpling/export/writer_util.go @@ -17,6 +17,8 @@ import ( tcontext "github.com/pingcap/tidb/dumpling/context" "github.com/pingcap/tidb/dumpling/log" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -41,11 +43,11 @@ type writerPipe struct { fileSizeLimit uint64 statementSizeLimit uint64 - w objstore.FileWriter + w objectio.Writer } func newWriterPipe( - w objstore.FileWriter, + w objectio.Writer, fileSizeLimit, statementSizeLimit uint64, metrics *metrics, @@ -121,8 +123,8 @@ func (b *writerPipe) ShouldSwitchStatement() bool { (b.statementSizeLimit != UnspecifiedSize && b.currentStatementSize >= b.statementSizeLimit) } -// WriteMeta writes MetaIR to a storage.FileWriter -func WriteMeta(tctx *tcontext.Context, meta MetaIR, w objstore.FileWriter) error { +// WriteMeta writes MetaIR to an objectio.Writer +func WriteMeta(tctx *tcontext.Context, meta MetaIR, w objectio.Writer) error { tctx.L().Debug("start dumping meta data", zap.String("target", meta.TargetName())) specCmtIter := meta.SpecialComments() @@ -140,13 +142,13 @@ func WriteMeta(tctx *tcontext.Context, meta MetaIR, w objstore.FileWriter) error return nil } -// WriteInsert writes TableDataIR to a storage.FileWriter in sql type +// WriteInsert writes TableDataIR to an objectio.Writer in SQL type func WriteInsert( pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, - w objstore.FileWriter, + w objectio.Writer, metrics *metrics, ) (n uint64, err error) { fileRowIter := tblIR.Rows() @@ -288,13 +290,13 @@ func WriteInsert( return counter, wp.Error() } -// WriteInsertInCsv writes TableDataIR to a storage.FileWriter in csv type +// WriteInsertInCsv writes TableDataIR to objectio.Writer in csv type func WriteInsertInCsv( pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, - w objstore.FileWriter, + w objectio.Writer, metrics *metrics, ) (n uint64, err error) { fileRowIter := tblIR.Rows() @@ -417,7 +419,7 @@ func WriteInsertInCsv( return counter, wp.Error() } -func write(tctx *tcontext.Context, writer objstore.FileWriter, str string) error { +func write(tctx *tcontext.Context, writer objectio.Writer, str string) error { _, err := writer.Write(tctx, []byte(str)) if err != nil { // str might be very long, only output the first 200 chars @@ -429,7 +431,7 @@ func write(tctx *tcontext.Context, writer objstore.FileWriter, str string) error return errors.Trace(err) } -func writeBytes(tctx *tcontext.Context, writer objstore.FileWriter, p []byte) error { +func writeBytes(tctx *tcontext.Context, writer objectio.Writer, p []byte) error { _, err := writer.Write(tctx, p) if err != nil { // str might be very long, only output the first 200 chars @@ -444,10 +446,10 @@ func writeBytes(tctx *tcontext.Context, writer objstore.FileWriter, p []byte) er return errors.Trace(err) } -func buildFileWriter(tctx *tcontext.Context, s objstore.Storage, fileName string, compressType objstore.CompressType) (objstore.FileWriter, func(ctx context.Context) error, error) { - fileName += compressFileSuffix(compressType) +func buildFileWriter(tctx *tcontext.Context, s objstore.Storage, fileName string, compressType compressedio.CompressType) (objectio.Writer, func(ctx context.Context) error, error) { + fileName += compressType.FileSuffix() fullPath := s.URI() + "/" + fileName - writer, err := objstore.WithCompression(s, compressType, objstore.DecompressConfig{}).Create(tctx, fileName, nil) + writer, err := objstore.WithCompression(s, compressType, compressedio.DecompressConfig{}).Create(tctx, fileName, nil) if err != nil { tctx.L().Warn("fail to open file", zap.String("path", fullPath), @@ -472,15 +474,15 @@ func buildFileWriter(tctx *tcontext.Context, s objstore.Storage, fileName string return writer, tearDownRoutine, nil } -func buildInterceptFileWriter(pCtx *tcontext.Context, s objstore.Storage, fileName string, compressType objstore.CompressType) (objstore.FileWriter, func(context.Context) error) { - fileName += compressFileSuffix(compressType) - var writer objstore.FileWriter +func buildInterceptFileWriter(pCtx *tcontext.Context, s objstore.Storage, fileName string, compressType compressedio.CompressType) (objectio.Writer, func(context.Context) error) { + fileName += compressType.FileSuffix() + var writer objectio.Writer fullPath := s.URI() + "/" + fileName fileWriter := &InterceptFileWriter{} initRoutine := func() error { // use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close, // which will cause a context canceled error when closing gcs's Writer - w, err := objstore.WithCompression(s, compressType, objstore.DecompressConfig{}).Create(pCtx, fileName, nil) + w, err := objstore.WithCompression(s, compressType, compressedio.DecompressConfig{}).Create(pCtx, fileName, nil) if err != nil { pCtx.L().Warn("fail to open file", zap.String("path", fullPath), @@ -489,7 +491,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s objstore.Storage, fileNa } writer = w pCtx.L().Debug("opened file", zap.String("path", fullPath)) - fileWriter.FileWriter = writer + fileWriter.Writer = writer return nil } fileWriter.initRoutine = initRoutine @@ -549,7 +551,7 @@ func newWriterError(err error) error { // InterceptFileWriter is an interceptor of os.File, // tracking whether a StringWriter has written something. type InterceptFileWriter struct { - objstore.FileWriter + objectio.Writer sync.Once SomethingIsWritten bool @@ -557,7 +559,7 @@ type InterceptFileWriter struct { err error } -// Write implements storage.FileWriter.Write. It check whether writer has written something and init a file at first time +// Write implements objectio.Writer. It checks whether writer has written something and init a file at first time func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error) { w.Do(func() { w.err = w.initRoutine() }) if len(p) > 0 { @@ -566,13 +568,13 @@ func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error) if w.err != nil { return 0, errors.Annotate(w.err, "open file error") } - n, err := w.FileWriter.Write(ctx, p) + n, err := w.Writer.Write(ctx, p) return n, newWriterError(err) } // Close closes the InterceptFileWriter func (w *InterceptFileWriter) Close(ctx context.Context) error { - return w.FileWriter.Close(ctx) + return w.Writer.Close(ctx) } func wrapBackTicks(identifier string) string { @@ -586,21 +588,6 @@ func wrapStringWith(str string, wrapper string) string { return fmt.Sprintf("%s%s%s", wrapper, str, wrapper) } -func compressFileSuffix(compressType objstore.CompressType) string { - switch compressType { - case objstore.NoCompression: - return "" - case objstore.Gzip: - return ".gz" - case objstore.Snappy: - return ".snappy" - case objstore.Zstd: - return ".zst" - default: - return "" - } -} - // FileFormat is the format that output to file. Currently we support SQL text and CSV file format. type FileFormat int32 @@ -647,13 +634,13 @@ func (f FileFormat) Extension() string { } } -// WriteInsert writes TableDataIR to a storage.FileWriter in sql/csv type +// WriteInsert writes TableDataIR to a objectio.Writer in sql/csv type func (f FileFormat) WriteInsert( pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, - w objstore.FileWriter, + w objectio.Writer, metrics *metrics, ) (uint64, error) { switch f { diff --git a/lightning/pkg/importer/BUILD.bazel b/lightning/pkg/importer/BUILD.bazel index 24140312fc..f0af78a82e 100644 --- a/lightning/pkg/importer/BUILD.bazel +++ b/lightning/pkg/importer/BUILD.bazel @@ -53,6 +53,7 @@ go_library( "//pkg/meta/metabuild", "//pkg/meta/model", "//pkg/objstore", + "//pkg/objstore/compressedio", "//pkg/parser", "//pkg/parser/ast", "//pkg/parser/mysql", diff --git a/lightning/pkg/importer/chunk_process.go b/lightning/pkg/importer/chunk_process.go index d6b436f0bf..98a9e71cdc 100644 --- a/lightning/pkg/importer/chunk_process.go +++ b/lightning/pkg/importer/chunk_process.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/worker" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/store/driver/txn" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -84,7 +85,7 @@ func openParser( tblInfo *model.TableInfo, ) (mydump.Parser, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) - reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, objstore.DecompressConfig{ + reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, compressedio.DecompressConfig{ ZStdDecodeConcurrency: 1, }) if err != nil { diff --git a/lightning/pkg/importer/get_pre_info.go b/lightning/pkg/importer/get_pre_info.go index e17db62df4..193b87df9b 100644 --- a/lightning/pkg/importer/get_pre_info.go +++ b/lightning/pkg/importer/get_pre_info.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/pkg/meta/metabuild" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" _ "github.com/pingcap/tidb/pkg/planner/core" // to setup expression.EvalAstExpr. Otherwise we cannot parse the default value @@ -466,7 +467,7 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context, // ReadFirstNRowsByFileMeta reads the first N rows of an data file. // It implements the PreImportInfoGetter interface. func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) { - reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, objstore.DecompressConfig{ + reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, compressedio.DecompressConfig{ ZStdDecodeConcurrency: 1, }) if err != nil { @@ -623,7 +624,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable( return resultIndexRatio, isRowOrdered, nil } sampleFile := tableMeta.DataFiles[0].FileMeta - reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, objstore.DecompressConfig{ + reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, compressedio.DecompressConfig{ ZStdDecodeConcurrency: 1, }) if err != nil { diff --git a/pkg/dxf/importinto/conflictedkv/BUILD.bazel b/pkg/dxf/importinto/conflictedkv/BUILD.bazel index 90821dd76f..dd87a4579c 100644 --- a/pkg/dxf/importinto/conflictedkv/BUILD.bazel +++ b/pkg/dxf/importinto/conflictedkv/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/lightning/verification", "//pkg/meta/model", "//pkg/objstore", + "//pkg/objstore/objectio", "//pkg/table", "//pkg/table/tables", "//pkg/tablecodec", diff --git a/pkg/dxf/importinto/conflictedkv/collector.go b/pkg/dxf/importinto/conflictedkv/collector.go index 793f1a5e7d..ac9c1a0acd 100644 --- a/pkg/dxf/importinto/conflictedkv/collector.go +++ b/pkg/dxf/importinto/conflictedkv/collector.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/verification" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "go.uber.org/zap" @@ -79,7 +80,7 @@ type Collector struct { fileSeq int currFileSize int64 - writer objstore.FileWriter + writer objectio.Writer } // NewCollector creates a new conflicted KV info collector. diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 298f6ae6c8..1834fd481c 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -161,6 +161,7 @@ go_library( "//pkg/meta/model", "//pkg/metrics", "//pkg/objstore", + "//pkg/objstore/compressedio", "//pkg/parser", "//pkg/parser/ast", "//pkg/parser/auth", diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 90e22df3e8..bace4e1f32 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -42,6 +42,7 @@ go_library( "//pkg/meta/model", "//pkg/metrics", "//pkg/objstore", + "//pkg/objstore/compressedio", "//pkg/parser", "//pkg/parser/ast", "//pkg/parser/format", diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 1e40f3dfed..0e664c14af 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/mydump" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" pformat "github.com/pingcap/tidb/pkg/parser/format" @@ -1583,7 +1584,7 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo { f := e.dataFiles[i] result = append(result, LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore, objstore.DecompressConfig{}) + fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore, compressedio.DecompressConfig{}) if err2 != nil { return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err2), "Please check the INFILE path is correct") } diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index aea5dff6b1..d70fd71831 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -47,7 +47,7 @@ import ( verify "github.com/pingcap/tidb/pkg/lightning/verification" "github.com/pingcap/tidb/pkg/meta/autoid" tidbmetrics "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/sessiontxn" @@ -318,7 +318,7 @@ func (ti *TableImporter) GetKVStore() tidbkv.Storage { func (e *LoadDataController) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { info := LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, e.dataStore, objstore.DecompressConfig{ + reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, e.dataStore, compressedio.DecompressConfig{ ZStdDecodeConcurrency: 1, }) if err != nil { diff --git a/pkg/executor/load_data.go b/pkg/executor/load_data.go index ecbadad1c2..04cbc5d76e 100644 --- a/pkg/executor/load_data.go +++ b/pkg/executor/load_data.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/mydump" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" @@ -215,7 +216,7 @@ func (e *LoadDataWorker) LoadLocal(ctx context.Context, r io.ReadCloser) error { readers := []importer.LoadDataReaderInfo{{ Opener: func(_ context.Context) (io.ReadSeekCloser, error) { addedSeekReader := NewSimpleSeekerOnReadCloser(r) - return objstore.InterceptDecompressReader(addedSeekReader, compressTp2, objstore.DecompressConfig{ + return objstore.InterceptDecompressReader(addedSeekReader, compressTp2, compressedio.DecompressConfig{ ZStdDecodeConcurrency: 1, }) }}} @@ -782,7 +783,7 @@ func (s *SimpleSeekerOnReadCloser) Close() error { return s.r.Close() } -// GetFileSize implements storage.FileReader. +// GetFileSize implements objectio.Reader. func (*SimpleSeekerOnReadCloser) GetFileSize() (int64, error) { return 0, errors.Errorf("unsupported GetFileSize on SimpleSeekerOnReadCloser") } diff --git a/pkg/lightning/backend/external/BUILD.bazel b/pkg/lightning/backend/external/BUILD.bazel index f434f3911d..5edde2385f 100644 --- a/pkg/lightning/backend/external/BUILD.bazel +++ b/pkg/lightning/backend/external/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/lightning/metric", "//pkg/metrics", "//pkg/objstore", + "//pkg/objstore/objectio", "//pkg/resourcemanager/pool/workerpool", "//pkg/resourcemanager/util", "//pkg/util", @@ -95,6 +96,7 @@ go_test( "//pkg/lightning/log", "//pkg/lightning/membuf", "//pkg/objstore", + "//pkg/objstore/objectio", "//pkg/resourcemanager/pool/workerpool", "//pkg/testkit/testfailpoint", "//pkg/util", diff --git a/pkg/lightning/backend/external/bench_test.go b/pkg/lightning/backend/external/bench_test.go index 18514a04cd..74c6fcee47 100644 --- a/pkg/lightning/backend/external/bench_test.go +++ b/pkg/lightning/backend/external/bench_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/size" @@ -61,7 +62,7 @@ func writePlainFile(s *writeTestSuite) { _ = s.store.DeleteFile(ctx, filePath) buf := make([]byte, s.memoryLimit) offset := 0 - flush := func(w objstore.FileWriter) { + flush := func(w objectio.Writer) { n, err := w.Write(ctx, buf[:offset]) intest.AssertNoError(err) intest.Assert(offset == n) diff --git a/pkg/lightning/backend/external/byte_reader.go b/pkg/lightning/backend/external/byte_reader.go index cc526c0766..10018e8af2 100644 --- a/pkg/lightning/backend/external/byte_reader.go +++ b/pkg/lightning/backend/external/byte_reader.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" @@ -46,7 +47,7 @@ var ( // data to improve throughput. type byteReader struct { ctx context.Context - storageReader objstore.FileReader + storageReader objectio.Reader // curBuf is either smallBuf or concurrentReader.largeBuf. curBuf [][]byte @@ -78,7 +79,7 @@ func openStoreReaderAndSeek( name string, initFileOffset uint64, prefetchSize int, -) (objstore.FileReader, error) { +) (objectio.Reader, error) { storageReader, err := store.Open(ctx, name, &objstore.ReaderOption{ StartOffset: aws.Int64(int64(initFileOffset)), PrefetchSize: prefetchSize, @@ -94,7 +95,7 @@ func openStoreReaderAndSeek( // concurrent reading mode. func newByteReader( ctx context.Context, - storageReader objstore.FileReader, + storageReader objectio.Reader, bufSize int, ) (r *byteReader, err error) { defer func() { diff --git a/pkg/lightning/backend/external/byte_reader_test.go b/pkg/lightning/backend/external/byte_reader_test.go index 0e13aea82e..6d245621a2 100644 --- a/pkg/lightning/backend/external/byte_reader_test.go +++ b/pkg/lightning/backend/external/byte_reader_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) @@ -73,7 +74,7 @@ func TestByteReader(t *testing.T) { err := st.WriteFile(context.Background(), "testfile", []byte("abcde")) require.NoError(t, err) - newRsc := func() objstore.FileReader { + newRsc := func() objectio.Reader { rsc, err := st.Open(context.Background(), "testfile", nil) require.NoError(t, err) return rsc @@ -170,7 +171,7 @@ func TestUnexpectedEOF(t *testing.T) { err := st.WriteFile(context.Background(), "testfile", []byte("0123456789")) require.NoError(t, err) - newRsc := func() objstore.FileReader { + newRsc := func() objectio.Reader { rsc, err := st.Open(context.Background(), "testfile", nil) require.NoError(t, err) return rsc @@ -199,7 +200,7 @@ func TestEmptyContent(t *testing.T) { err = st.WriteFile(context.Background(), "testfile", []byte("")) require.NoError(t, err) - newRsc := func() objstore.FileReader { + newRsc := func() objectio.Reader { rsc, err := st.Open(context.Background(), "testfile", nil) require.NoError(t, err) return rsc diff --git a/pkg/lightning/backend/external/engine.go b/pkg/lightning/backend/external/engine.go index 458d0dccb8..bc6e783a7f 100644 --- a/pkg/lightning/backend/external/engine.go +++ b/pkg/lightning/backend/external/engine.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" @@ -177,7 +178,7 @@ type Engine struct { recordedDupCnt int recordedDupSize int64 dupFile string - dupWriter objstore.FileWriter + dupWriter objectio.Writer dupKVStore *KeyValueStore } diff --git a/pkg/lightning/backend/external/file.go b/pkg/lightning/backend/external/file.go index 76c27d6a82..0ab112c19a 100644 --- a/pkg/lightning/backend/external/file.go +++ b/pkg/lightning/backend/external/file.go @@ -17,7 +17,7 @@ package external import ( "context" - "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" ) const ( @@ -29,7 +29,7 @@ const ( // KeyValueStore stores key-value pairs and maintains the range properties. type KeyValueStore struct { - dataWriter objstore.FileWriter + dataWriter objectio.Writer rc *rangePropertiesCollector ctx context.Context @@ -41,7 +41,7 @@ type KeyValueStore struct { // rangePropertiesCollector. func NewKeyValueStore( ctx context.Context, - dataWriter objstore.FileWriter, + dataWriter objectio.Writer, rangePropertiesCollector *rangePropertiesCollector, ) *KeyValueStore { kvStore := &KeyValueStore{ diff --git a/pkg/lightning/backend/external/iter_test.go b/pkg/lightning/backend/external/iter_test.go index 4c8a47ff66..5a40baaa48 100644 --- a/pkg/lightning/backend/external/iter_test.go +++ b/pkg/lightning/backend/external/iter_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -37,7 +38,7 @@ type trackOpenMemStorage struct { opened atomic.Int32 } -func (s *trackOpenMemStorage) Open(ctx context.Context, path string, _ *objstore.ReaderOption) (objstore.FileReader, error) { +func (s *trackOpenMemStorage) Open(ctx context.Context, path string, _ *objstore.ReaderOption) (objectio.Reader, error) { s.opened.Inc() r, err := s.MemStorage.Open(ctx, path, nil) if err != nil { @@ -47,12 +48,12 @@ func (s *trackOpenMemStorage) Open(ctx context.Context, path string, _ *objstore } type trackOpenFileReader struct { - objstore.FileReader + objectio.Reader store *trackOpenMemStorage } func (r *trackOpenFileReader) Close() error { - err := r.FileReader.Close() + err := r.Reader.Close() if err != nil { return err } @@ -320,7 +321,7 @@ func testMergeIterSwitchMode(t *testing.T, f func([]byte, int) []byte) { } type eofReader struct { - objstore.FileReader + objectio.Reader } func (r eofReader) Seek(_ int64, _ int) (int64, error) { @@ -651,7 +652,7 @@ func (s *slowOpenStorage) Open( ctx context.Context, filePath string, o *objstore.ReaderOption, -) (objstore.FileReader, error) { +) (objectio.Reader, error) { time.Sleep(s.sleep) s.openCnt.Inc() return s.MemStorage.Open(ctx, filePath, o) diff --git a/pkg/lightning/backend/external/onefile_writer.go b/pkg/lightning/backend/external/onefile_writer.go index 4154724fd3..7e2bddfb7a 100644 --- a/pkg/lightning/backend/external/onefile_writer.go +++ b/pkg/lightning/backend/external/onefile_writer.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -73,8 +74,8 @@ type OneFileWriter struct { rnd *rand.Rand dataFile string statFile string - dataWriter objstore.FileWriter - statWriter objstore.FileWriter + dataWriter objectio.Writer + statWriter objectio.Writer onClose OnWriterCloseFunc closed bool @@ -89,7 +90,7 @@ type OneFileWriter struct { // below fields are only used when onDup is OnDuplicateKeyRecord. recordedDupCnt int dupFile string - dupWriter objstore.FileWriter + dupWriter objectio.Writer dupKVStore *KeyValueStore minKey []byte diff --git a/pkg/lightning/backend/external/writer.go b/pkg/lightning/backend/external/writer.go index 448ea9ce0e..60468df5e5 100644 --- a/pkg/lightning/backend/external/writer.go +++ b/pkg/lightning/backend/external/writer.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" @@ -811,7 +812,7 @@ func (w *Writer) reCalculateKVSize() int64 { func (w *Writer) createStorageWriter(ctx context.Context) ( dataFile, statFile string, - data, stats objstore.FileWriter, + data, stats objectio.Writer, err error, ) { dataPath := filepath.Join(w.getPartitionedPrefix(), strconv.Itoa(w.currentSeq)) @@ -834,7 +835,7 @@ func (w *Writer) createStorageWriter(ctx context.Context) ( return dataPath, statPath, dataWriter, statsWriter, nil } -func (w *Writer) createDupWriter(ctx context.Context) (string, objstore.FileWriter, error) { +func (w *Writer) createDupWriter(ctx context.Context) (string, objectio.Writer, error) { path := filepath.Join(w.getPartitionedPrefix()+dupSuffix, strconv.Itoa(w.currentSeq)) writer, err := w.store.Create(ctx, path, &objstore.WriterOption{ Concurrency: 20, diff --git a/pkg/lightning/backend/external/writer_test.go b/pkg/lightning/backend/external/writer_test.go index e4c0047e88..134cd0e533 100644 --- a/pkg/lightning/backend/external/writer_test.go +++ b/pkg/lightning/backend/external/writer_test.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/require" @@ -495,19 +496,19 @@ func (s *writerFirstCloseFailStorage) Create( ctx context.Context, path string, option *objstore.WriterOption, -) (objstore.FileWriter, error) { +) (objectio.Writer, error) { w, err := s.Storage.Create(ctx, path, option) if err != nil { return nil, err } if strings.Contains(path, statSuffix) { - return &firstCloseFailWriter{FileWriter: w, shouldFail: &s.shouldFail}, nil + return &firstCloseFailWriter{Writer: w, shouldFail: &s.shouldFail}, nil } return w, nil } type firstCloseFailWriter struct { - objstore.FileWriter + objectio.Writer shouldFail *bool } @@ -516,7 +517,7 @@ func (w *firstCloseFailWriter) Close(ctx context.Context) error { *w.shouldFail = false return fmt.Errorf("first close fail") } - return w.FileWriter.Close(ctx) + return w.Writer.Close(ctx) } func TestFlushKVsRetry(t *testing.T) { diff --git a/pkg/lightning/mydump/BUILD.bazel b/pkg/lightning/mydump/BUILD.bazel index a1eb0bfbb9..c9c3268415 100644 --- a/pkg/lightning/mydump/BUILD.bazel +++ b/pkg/lightning/mydump/BUILD.bazel @@ -28,6 +28,8 @@ go_library( "//pkg/lightning/metric", "//pkg/lightning/worker", "//pkg/objstore", + "//pkg/objstore/compressedio", + "//pkg/objstore/objectio", "//pkg/parser", "//pkg/parser/ast", "//pkg/parser/charset", diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index 710ca516c4..649194c8ef 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/lightning/metric" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/util/logutil" regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router" filter "github.com/pingcap/tidb/pkg/util/table-filter" @@ -853,7 +854,7 @@ func (l *MDLoader) GetAllFiles() map[string]FileInfo { func calculateFileBytes(ctx context.Context, dataFile string, - compressType objstore.CompressType, + compressType compressedio.CompressType, store objstore.Storage, offset int64) (tot int, pos int64, err error) { bytes := make([]byte, sampleCompressedFileSize) @@ -863,7 +864,7 @@ func calculateFileBytes(ctx context.Context, } defer reader.Close() - decompressConfig := objstore.DecompressConfig{ZStdDecodeConcurrency: 1} + decompressConfig := compressedio.DecompressConfig{ZStdDecodeConcurrency: 1} compressReader, err := objstore.NewLimitedInterceptReader(reader, compressType, decompressConfig, offset) if err != nil { return 0, 0, errors.Trace(err) diff --git a/pkg/lightning/mydump/parquet_writer.go b/pkg/lightning/mydump/parquet_writer.go index 049e373f5c..269d503ad1 100644 --- a/pkg/lightning/mydump/parquet_writer.go +++ b/pkg/lightning/mydump/parquet_writer.go @@ -23,6 +23,7 @@ import ( "github.com/apache/arrow-go/v18/parquet/file" "github.com/apache/arrow-go/v18/parquet/schema" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" ) // ParquetColumn defines the properties of a column in a Parquet file. @@ -38,7 +39,7 @@ type ParquetColumn struct { } type writeWrapper struct { - Writer objstore.FileWriter + Writer objectio.Writer } func (*writeWrapper) Seek(_ int64, _ int) (int64, error) { diff --git a/pkg/lightning/mydump/parser.go b/pkg/lightning/mydump/parser.go index d186525bad..0ea1925937 100644 --- a/pkg/lightning/mydump/parser.go +++ b/pkg/lightning/mydump/parser.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/metric" "github.com/pingcap/tidb/pkg/lightning/worker" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/logutil" @@ -676,7 +677,7 @@ func OpenReader( ctx context.Context, fileMeta *SourceFileMeta, store objstore.Storage, - decompressCfg objstore.DecompressConfig, + decompressCfg compressedio.DecompressConfig, ) (reader objstore.ReadSeekCloser, err error) { switch { case fileMeta.Type == SourceTypeParquet: diff --git a/pkg/lightning/mydump/reader.go b/pkg/lightning/mydump/reader.go index 3a3ffe9afc..794c7a5109 100644 --- a/pkg/lightning/mydump/reader.go +++ b/pkg/lightning/mydump/reader.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/lightning/worker" "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/spkg/bom" @@ -91,7 +92,7 @@ func ExportStatement(ctx context.Context, store objstore.Storage, if err != nil { return nil, errors.Trace(err) } - store = objstore.WithCompression(store, compressType, objstore.DecompressConfig{ + store = objstore.WithCompression(store, compressType, compressedio.DecompressConfig{ ZStdDecodeConcurrency: 1, }) } diff --git a/pkg/lightning/mydump/router.go b/pkg/lightning/mydump/router.go index 1a800b5baf..1f77dce4d1 100644 --- a/pkg/lightning/mydump/router.go +++ b/pkg/lightning/mydump/router.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tidb/pkg/lightning/log" - "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/util/filter" "go.uber.org/zap" ) @@ -88,18 +88,18 @@ const ( ) // ToStorageCompressType converts Compression to storage.CompressType. -func ToStorageCompressType(compression Compression) (objstore.CompressType, error) { +func ToStorageCompressType(compression Compression) (compressedio.CompressType, error) { switch compression { case CompressionGZ: - return objstore.Gzip, nil + return compressedio.Gzip, nil case CompressionSnappy: - return objstore.Snappy, nil + return compressedio.Snappy, nil case CompressionZStd: - return objstore.Zstd, nil + return compressedio.Zstd, nil case CompressionNone: - return objstore.NoCompression, nil + return compressedio.NoCompression, nil default: - return objstore.NoCompression, + return compressedio.NoCompression, errors.Errorf("compression %d doesn't have related storage compressType", compression) } } diff --git a/pkg/objstore/BUILD.bazel b/pkg/objstore/BUILD.bazel index c3fb84457c..1956be5ac4 100644 --- a/pkg/objstore/BUILD.bazel +++ b/pkg/objstore/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "s3.go", "s3_interface.go", "storage.go", - "writer.go", ], importpath = "github.com/pingcap/tidb/pkg/objstore", visibility = ["//visibility:public"], @@ -32,6 +31,8 @@ go_library( "//br/pkg/utils", "//br/pkg/utils/iter", "//pkg/metrics", + "//pkg/objstore/compressedio", + "//pkg/objstore/objectio", "//pkg/objstore/recording", "//pkg/sessionctx/variable", "//pkg/util", @@ -66,9 +67,6 @@ go_library( "@com_github_docker_go_units//:go-units", "@com_github_go_resty_resty_v2//:resty", "@com_github_google_uuid//:uuid", - "@com_github_klauspost_compress//gzip", - "@com_github_klauspost_compress//snappy", - "@com_github_klauspost_compress//zstd", "@com_github_ks3sdklib_aws_sdk_go//aws", "@com_github_ks3sdklib_aws_sdk_go//aws/awserr", "@com_github_ks3sdklib_aws_sdk_go//aws/credentials", @@ -108,7 +106,6 @@ go_test( "s3_flags_test.go", "s3_test.go", "storage_test.go", - "writer_test.go", ], embed = [":objstore"], flaky = True, @@ -116,6 +113,8 @@ go_test( deps = [ "//br/pkg/mock", "//dumpling/context", + "//pkg/objstore/compressedio", + "//pkg/objstore/objectio", "//pkg/objstore/recording", "//pkg/util/intest", "@com_github_aws_aws_sdk_go_v2//aws", @@ -125,7 +124,6 @@ go_test( "@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob", "@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//bloberror", "@com_github_fsouza_fake_gcs_server//fakestorage", - "@com_github_klauspost_compress//zstd", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/pkg/objstore/azblob.go b/pkg/objstore/azblob.go index 2394c412c9..e26b0f8724 100644 --- a/pkg/objstore/azblob.go +++ b/pkg/objstore/azblob.go @@ -43,6 +43,8 @@ import ( "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/pkg/objstore/compressedio" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/spf13/pflag" "go.uber.org/zap" ) @@ -636,7 +638,7 @@ func (s *AzureBlobStorage) DeleteFiles(ctx context.Context, names []string) erro } // Open implements the StorageReader interface. -func (s *AzureBlobStorage) Open(ctx context.Context, name string, o *ReaderOption) (FileReader, error) { +func (s *AzureBlobStorage) Open(ctx context.Context, name string, o *ReaderOption) (objectio.Reader, error) { client := s.containerClient.NewBlockBlobClient(s.withPrefix(name)) resp, err := client.GetProperties(ctx, nil) if err != nil { @@ -714,7 +716,7 @@ func (s *AzureBlobStorage) URI() string { const azblobChunkSize = 64 * 1024 * 1024 // Create implements the StorageWriter interface. -func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (FileWriter, error) { +func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (objectio.Writer, error) { client := s.containerClient.NewBlockBlobClient(s.withPrefix(name)) uploader := &azblobUploader{ blobClient: client, @@ -727,7 +729,7 @@ func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOptio cpkInfo: s.cpkInfo, } - uploaderWriter := newBufferedWriter(uploader, azblobChunkSize, NoCompression, nil) + uploaderWriter := objectio.NewBufferedWriter(uploader, azblobChunkSize, compressedio.NoCompression, nil) return uploaderWriter, nil } diff --git a/pkg/objstore/azblob_test.go b/pkg/objstore/azblob_test.go index 70933ae88a..390d4fe0ac 100644 --- a/pkg/objstore/azblob_test.go +++ b/pkg/objstore/azblob_test.go @@ -30,6 +30,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/stretchr/testify/require" ) @@ -464,7 +465,7 @@ func TestAzblobSeekToEndShouldNotError(t *testing.T) { } type wr struct { - w FileWriter + w objectio.Writer ctx context.Context } diff --git a/pkg/objstore/batch.go b/pkg/objstore/batch.go index 16ac9d004b..2d7c94b1fd 100644 --- a/pkg/objstore/batch.go +++ b/pkg/objstore/batch.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/objstore/objectio" "go.uber.org/multierr" ) @@ -152,7 +153,7 @@ func (d *Batched) Rename(ctx context.Context, oldName, newName string) error { } // Create implements the Storage interface. -func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (FileWriter, error) { +func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (objectio.Writer, error) { return nil, errors.Annotatef(berrors.ErrStorageUnknown, "ExternalStorage.Create isn't allowed in batch mode for now.") } diff --git a/pkg/objstore/compress.go b/pkg/objstore/compress.go index a94dff6cb0..574379aca9 100644 --- a/pkg/objstore/compress.go +++ b/pkg/objstore/compress.go @@ -21,18 +21,20 @@ import ( "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/objstore/compressedio" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/objstore/recording" ) type withCompression struct { Storage - compressType CompressType - decompressCfg DecompressConfig + compressType compressedio.CompressType + decompressCfg compressedio.DecompressConfig } // WithCompression returns an Storage with compress option -func WithCompression(inner Storage, compressionType CompressType, cfg DecompressConfig) Storage { - if compressionType == NoCompression { +func WithCompression(inner Storage, compressionType compressedio.CompressType, cfg compressedio.DecompressConfig) Storage { + if compressionType == compressedio.NoCompression { return inner } return &withCompression{ @@ -42,21 +44,21 @@ func WithCompression(inner Storage, compressionType CompressType, cfg Decompress } } -func (w *withCompression) Create(ctx context.Context, name string, o *WriterOption) (FileWriter, error) { +func (w *withCompression) Create(ctx context.Context, name string, o *WriterOption) (objectio.Writer, error) { writer, err := w.Storage.Create(ctx, name, o) if err != nil { return nil, errors.Trace(err) } // some implementation already wrap the writer, so we need to unwrap it - if bw, ok := writer.(*bufferedWriter); ok { - writer = bw.writer + if bw, ok := writer.(*objectio.BufferedWriter); ok { + writer = bw.GetWriter() } // the external storage will do access recording, so no need to pass it again. - compressedWriter := newBufferedWriter(writer, hardcodedS3ChunkSize, w.compressType, nil) + compressedWriter := objectio.NewBufferedWriter(writer, hardcodedS3ChunkSize, w.compressType, nil) return compressedWriter, nil } -func (w *withCompression) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) { +func (w *withCompression) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) { fileReader, err := w.Storage.Open(ctx, path, o) if err != nil { return nil, errors.Trace(err) @@ -70,7 +72,7 @@ func (w *withCompression) Open(ctx context.Context, path string, o *ReaderOption func (w *withCompression) WriteFile(ctx context.Context, name string, data []byte) error { bf := bytes.NewBuffer(make([]byte, 0, len(data))) - compressBf := newCompressWriter(w.compressType, bf) + compressBf := compressedio.NewWriter(w.compressType, bf) _, err := compressBf.Write(data) if err != nil { return errors.Trace(err) @@ -88,7 +90,7 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er return data, errors.Trace(err) } bf := bytes.NewBuffer(data) - compressBf, err := newCompressReader(w.compressType, w.decompressCfg, bf) + compressBf, err := compressedio.NewReader(w.compressType, w.decompressCfg, bf) if err != nil { return nil, err } @@ -103,18 +105,18 @@ type compressReader struct { } // InterceptDecompressReader intercepts the reader and wraps it with a decompress -// reader on the given FileReader. Note that the returned -// FileReader does not have the property that Seek(0, io.SeekCurrent) +// reader on the given Reader. Note that the returned +// Reader does not have the property that Seek(0, io.SeekCurrent) // equals total bytes Read() if the decompress reader is used. func InterceptDecompressReader( - fileReader FileReader, - compressType CompressType, - cfg DecompressConfig, -) (FileReader, error) { - if compressType == NoCompression { + fileReader objectio.Reader, + compressType compressedio.CompressType, + cfg compressedio.DecompressConfig, +) (objectio.Reader, error) { + if compressType == compressedio.NoCompression { return fileReader, nil } - r, err := newCompressReader(compressType, cfg, fileReader) + r, err := compressedio.NewReader(compressType, cfg, fileReader) if err != nil { return nil, errors.Trace(err) } @@ -127,11 +129,11 @@ func InterceptDecompressReader( // NewLimitedInterceptReader creates a decompress reader with limit n. func NewLimitedInterceptReader( - fileReader FileReader, - compressType CompressType, - cfg DecompressConfig, + fileReader objectio.Reader, + compressType compressedio.CompressType, + cfg compressedio.DecompressConfig, n int64, -) (FileReader, error) { +) (objectio.Reader, error) { newFileReader := fileReader if n < 0 { return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support negative limit, n: %d", n) @@ -164,7 +166,7 @@ func (c *compressReader) GetFileSize() (int64, error) { type flushStorageWriter struct { writer io.Writer - flusher flusher + flusher compressedio.Flusher closer io.Closer accessRec *recording.AccessStats } @@ -183,7 +185,7 @@ func (w *flushStorageWriter) Close(_ context.Context) error { return w.closer.Close() } -func newFlushStorageWriter(writer io.Writer, flusher2 flusher, closer io.Closer, accessRec *recording.AccessStats) *flushStorageWriter { +func newFlushStorageWriter(writer io.Writer, flusher2 compressedio.Flusher, closer io.Closer, accessRec *recording.AccessStats) *flushStorageWriter { return &flushStorageWriter{ writer: writer, flusher: flusher2, diff --git a/pkg/objstore/compress_test.go b/pkg/objstore/compress_test.go index 1b5824e707..77293effb6 100644 --- a/pkg/objstore/compress_test.go +++ b/pkg/objstore/compress_test.go @@ -22,6 +22,7 @@ import ( "strings" "testing" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/stretchr/testify/require" ) @@ -32,7 +33,7 @@ func TestWithCompressReadWriteFile(t *testing.T) { ctx := context.Background() storage, err := Create(ctx, backend, true) require.NoError(t, err) - storage = WithCompression(storage, Gzip, DecompressConfig{}) + storage = WithCompression(storage, compressedio.Gzip, compressedio.DecompressConfig{}) name := "with compress test" content := "hello,world!" fileName := strings.ReplaceAll(name, " ", "-") + ".txt.gz" @@ -42,7 +43,7 @@ func TestWithCompressReadWriteFile(t *testing.T) { // make sure compressed file is written correctly file, err := os.Open(filepath.Join(dir, fileName)) require.NoError(t, err) - uncompressedFile, err := newCompressReader(Gzip, DecompressConfig{}, file) + uncompressedFile, err := compressedio.NewReader(compressedio.Gzip, compressedio.DecompressConfig{}, file) require.NoError(t, err) newContent, err := io.ReadAll(uncompressedFile) require.NoError(t, err) diff --git a/pkg/objstore/compressedio/BUILD.bazel b/pkg/objstore/compressedio/BUILD.bazel new file mode 100644 index 0000000000..8be34b97b7 --- /dev/null +++ b/pkg/objstore/compressedio/BUILD.bazel @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "compressedio", + srcs = [ + "buffer.go", + "def.go", + "reader.go", + "writer.go", + ], + importpath = "github.com/pingcap/tidb/pkg/objstore/compressedio", + visibility = ["//visibility:public"], + deps = [ + "@com_github_klauspost_compress//gzip", + "@com_github_klauspost_compress//snappy", + "@com_github_klauspost_compress//zstd", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_log//:log", + "@org_uber_go_zap//:zap", + ], +) diff --git a/pkg/objstore/compressedio/buffer.go b/pkg/objstore/compressedio/buffer.go new file mode 100644 index 0000000000..05d33b1b77 --- /dev/null +++ b/pkg/objstore/compressedio/buffer.go @@ -0,0 +1,74 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compressedio + +import ( + "bytes" + + "github.com/pingcap/errors" +) + +// Buffer a compressed buffer. +type Buffer struct { + *bytes.Buffer + writer Writer + cap int +} + +// Write implements objectio.interceptBuffer. +func (b *Buffer) Write(p []byte) (int, error) { + written, err := b.writer.Write(p) + return written, errors.Trace(err) +} + +// Len implements objectio.interceptBuffer. +func (b *Buffer) Len() int { + return b.Buffer.Len() +} + +// Cap implements objectio.interceptBuffer. +func (b *Buffer) Cap() int { + return b.cap +} + +// Reset implements objectio.interceptBuffer. +func (b *Buffer) Reset() { + b.Buffer.Reset() +} + +// Flush implements objectio.interceptBuffer. +func (b *Buffer) Flush() error { + return b.writer.Flush() +} + +// Close implements objectio.interceptBuffer. +func (b *Buffer) Close() error { + return b.writer.Close() +} + +// Compressed implements objectio.interceptBuffer. +func (*Buffer) Compressed() bool { + return true +} + +// NewBuffer creates a new Buffer. +func NewBuffer(chunkSize int, compressType CompressType) *Buffer { + bf := bytes.NewBuffer(make([]byte, 0, chunkSize)) + return &Buffer{ + Buffer: bf, + cap: chunkSize, + writer: NewWriter(compressType, bf), + } +} diff --git a/pkg/objstore/compressedio/def.go b/pkg/objstore/compressedio/def.go new file mode 100644 index 0000000000..ac7ba8690f --- /dev/null +++ b/pkg/objstore/compressedio/def.go @@ -0,0 +1,75 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compressedio + +import "github.com/pingcap/errors" + +// CompressType represents the type of compression. +type CompressType uint8 + +const ( + // NoCompression won't compress given bytes. + NoCompression CompressType = iota + // Gzip will compress given bytes in gzip format. + Gzip + // Snappy will compress given bytes in snappy format. + Snappy + // Zstd will compress given bytes in zstd format. + Zstd +) + +// FileSuffix returns the file suffix for the compress type. +func (ct CompressType) FileSuffix() string { + switch ct { + case NoCompression: + return "" + case Gzip: + return ".gz" + case Snappy: + return ".snappy" + case Zstd: + return ".zst" + default: + return "" + } +} + +// Flusher flush interface. +type Flusher interface { + Flush() error +} + +// DecompressConfig is the config used for decompression. +type DecompressConfig struct { + // ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency. + // if not 1, ZStd will decode file asynchronously. + ZStdDecodeConcurrency int +} + +// ParseCompressType parses compressType string to storage.CompressType +func ParseCompressType(compressType string) (CompressType, error) { + switch compressType { + case "", "no-compression": + return NoCompression, nil + case "gzip", "gz": + return Gzip, nil + case "snappy": + return Snappy, nil + case "zstd", "zst": + return Zstd, nil + default: + return NoCompression, errors.Errorf("unknown compress type %s", compressType) + } +} diff --git a/pkg/objstore/compressedio/reader.go b/pkg/objstore/compressedio/reader.go new file mode 100644 index 0000000000..7b3affa5ad --- /dev/null +++ b/pkg/objstore/compressedio/reader.go @@ -0,0 +1,42 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compressedio + +import ( + "io" + + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zstd" +) + +// NewReader read compressed data. +// only for test now. +func NewReader(compressType CompressType, cfg DecompressConfig, r io.Reader) (io.Reader, error) { + switch compressType { + case Gzip: + return gzip.NewReader(r) + case Snappy: + return snappy.NewReader(r), nil + case Zstd: + options := []zstd.DOption{} + if cfg.ZStdDecodeConcurrency > 0 { + options = append(options, zstd.WithDecoderConcurrency(cfg.ZStdDecodeConcurrency)) + } + return zstd.NewReader(r, options...) + default: + return nil, nil + } +} diff --git a/pkg/objstore/compressedio/writer.go b/pkg/objstore/compressedio/writer.go new file mode 100644 index 0000000000..964348c6d2 --- /dev/null +++ b/pkg/objstore/compressedio/writer.go @@ -0,0 +1,49 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compressedio + +import ( + "io" + + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zstd" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Writer a compressed writer interface. +type Writer interface { + io.WriteCloser + Flusher +} + +// NewWriter creates a compress writer +func NewWriter(compressType CompressType, w io.Writer) Writer { + switch compressType { + case Gzip: + return gzip.NewWriter(w) + case Snappy: + return snappy.NewBufferedWriter(w) + case Zstd: + newWriter, err := zstd.NewWriter(w) + if err != nil { + log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err)) + } + return newWriter + default: + return nil + } +} diff --git a/pkg/objstore/gcs.go b/pkg/objstore/gcs.go index 85a57888cb..52f29c9c60 100644 --- a/pkg/objstore/gcs.go +++ b/pkg/objstore/gcs.go @@ -33,6 +33,8 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/objstore/compressedio" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/objstore/recording" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" @@ -267,7 +269,7 @@ func (s *GCSStorage) FileExists(ctx context.Context, name string) (bool, error) } // Open a Reader by file path. -func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) { +func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) { object := s.objectName(path) handle := s.GetBucketHandle().Object(object) @@ -360,7 +362,7 @@ func (s *GCSStorage) URI() string { } // Create implements Storage interface. -func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (FileWriter, error) { +func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (objectio.Writer, error) { // NewGCSWriter requires real testing environment on Google Cloud. mockGCS := intest.InTest && strings.Contains(s.gcs.GetEndpoint(), "127.0.0.1") if wo == nil || wo.Concurrency <= 1 || mockGCS { @@ -368,7 +370,7 @@ func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) wc := s.GetBucketHandle().Object(object).NewWriter(ctx) wc.StorageClass = s.gcs.StorageClass wc.PredefinedACL = s.gcs.PredefinedAcl - return newFlushStorageWriter(wc, &emptyFlusher{}, wc, s.accessRec), nil + return newFlushStorageWriter(wc, &objectio.EmptyFlusher{}, wc, s.accessRec), nil } uri := s.objectName(name) // 5MB is the minimum part size for GCS. @@ -377,9 +379,9 @@ func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) if err != nil { return nil, errors.Trace(err) } - fw := newFlushStorageWriter(w, &emptyFlusher{}, w, s.accessRec) + fw := newFlushStorageWriter(w, &objectio.EmptyFlusher{}, w, s.accessRec) // we already pass the accessRec to flushStorageWriter. - bw := newBufferedWriter(fw, int(partSize), NoCompression, nil) + bw := objectio.NewBufferedWriter(fw, int(partSize), compressedio.NoCompression, nil) return bw, nil } diff --git a/pkg/objstore/hdfs.go b/pkg/objstore/hdfs.go index 4e7392ab0d..ec394f0501 100644 --- a/pkg/objstore/hdfs.go +++ b/pkg/objstore/hdfs.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/objstore/objectio" ) // HDFSStorage represents HDFS storage. @@ -120,7 +121,7 @@ func (*HDFSStorage) DeleteFiles(_ context.Context, _ []string) error { } // Open a Reader by file path. path is relative path to storage base path -func (*HDFSStorage) Open(_ context.Context, _ string, _ *ReaderOption) (FileReader, error) { +func (*HDFSStorage) Open(_ context.Context, _ string, _ *ReaderOption) (objectio.Reader, error) { return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup") } @@ -140,7 +141,7 @@ func (s *HDFSStorage) URI() string { } // Create opens a file writer by path. path is relative path to storage base path -func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (FileWriter, error) { +func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (objectio.Writer, error) { return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup") } diff --git a/pkg/objstore/ks3.go b/pkg/objstore/ks3.go index f85a747b27..fa39ac90c6 100644 --- a/pkg/objstore/ks3.go +++ b/pkg/objstore/ks3.go @@ -35,6 +35,8 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/objstore/compressedio" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/objstore/recording" "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/prefetch" @@ -445,7 +447,7 @@ func (rs *KS3Storage) URI() string { } // Open a Reader by file path. -func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) { +func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) { start := int64(0) end := int64(0) prefetchSize := 0 @@ -678,7 +680,7 @@ func (r *ks3ObjectReader) GetFileSize() (int64, error) { } // createUploader create multi upload request. -func (rs *KS3Storage) createUploader(ctx context.Context, name string) (FileWriter, error) { +func (rs *KS3Storage) createUploader(ctx context.Context, name string) (objectio.Writer, error) { input := &s3.CreateMultipartUploadInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + name), @@ -708,8 +710,8 @@ func (rs *KS3Storage) createUploader(ctx context.Context, name string) (FileWrit } // Create creates multi upload request. -func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOption) (FileWriter, error) { - var uploader FileWriter +func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOption) (objectio.Writer, error) { + var uploader objectio.Writer var err error if option == nil || option.Concurrency <= 1 { uploader, err = rs.createUploader(ctx, name) @@ -747,7 +749,7 @@ func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOpt if option != nil && option.PartSize > 0 { bufSize = int(option.PartSize) } - uploaderWriter := newBufferedWriter(uploader, bufSize, NoCompression, rs.accessRec) + uploaderWriter := objectio.NewBufferedWriter(uploader, bufSize, compressedio.NoCompression, rs.accessRec) return uploaderWriter, nil } diff --git a/pkg/objstore/local.go b/pkg/objstore/local.go index c468fa9d2a..15b9f52877 100644 --- a/pkg/objstore/local.go +++ b/pkg/objstore/local.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/pkg/objstore/objectio" "go.uber.org/zap" ) @@ -209,7 +210,7 @@ func (l *LocalStorage) URI() string { } // Open a Reader by file path, path is a relative path to base path. -func (l *LocalStorage) Open(_ context.Context, path string, o *ReaderOption) (FileReader, error) { +func (l *LocalStorage) Open(_ context.Context, path string, o *ReaderOption) (objectio.Reader, error) { //nolint: gosec f, err := os.Open(filepath.Join(l.base, path)) if err != nil { @@ -273,7 +274,7 @@ func (f *localFile) GetFileSize() (int64, error) { } // Create implements Storage interface. -func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (FileWriter, error) { +func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (objectio.Writer, error) { filename := filepath.Join(l.base, name) dir := filepath.Dir(filename) err := os.MkdirAll(dir, 0750) diff --git a/pkg/objstore/local_test.go b/pkg/objstore/local_test.go index f700b87bd0..c6e4827f78 100644 --- a/pkg/objstore/local_test.go +++ b/pkg/objstore/local_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/stretchr/testify/require" ) @@ -199,7 +200,7 @@ func TestLocalFileReadRange(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close(ctx)) - checkContent := func(r FileReader, expected string) { + checkContent := func(r objectio.Reader, expected string) { buf := make([]byte, 10) n, _ := r.Read(buf) require.Equal(t, expected, string(buf[:n])) diff --git a/pkg/objstore/memstore.go b/pkg/objstore/memstore.go index ba41362bd9..48d22b5489 100644 --- a/pkg/objstore/memstore.go +++ b/pkg/objstore/memstore.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/objstore/objectio" "go.uber.org/atomic" ) @@ -146,7 +147,7 @@ func (s *MemStorage) FileExists(ctx context.Context, name string) (bool, error) // Open opens a Reader by file path. // It implements the `Storage` interface -func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption) (FileReader, error) { +func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption) (objectio.Reader, error) { if err := ctx.Err(); err != nil { return nil, errors.Trace(err) } @@ -227,7 +228,7 @@ func (*MemStorage) URI() string { // Create creates a file and returning a writer to write data into. // When the writer is closed, the data is stored in the file. // It implements the `Storage` interface -func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (FileWriter, error) { +func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (objectio.Writer, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -312,7 +313,7 @@ type memFileWriter struct { } // Write writes the data into the mem storage file buffer. -// It implements the `FileWriter` interface +// It implements the `Writer` interface func (w *memFileWriter) Write(ctx context.Context, p []byte) (int, error) { select { case <-ctx.Done(): diff --git a/pkg/objstore/mockobjstore/BUILD.bazel b/pkg/objstore/mockobjstore/BUILD.bazel index d75ddcd718..50d271a221 100644 --- a/pkg/objstore/mockobjstore/BUILD.bazel +++ b/pkg/objstore/mockobjstore/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/objstore", + "//pkg/objstore/objectio", "@org_uber_go_mock//gomock", ], ) diff --git a/pkg/objstore/mockobjstore/objstore_mock.go b/pkg/objstore/mockobjstore/objstore_mock.go index 484c162b5c..361311be13 100644 --- a/pkg/objstore/mockobjstore/objstore_mock.go +++ b/pkg/objstore/mockobjstore/objstore_mock.go @@ -14,6 +14,7 @@ import ( reflect "reflect" objstore "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/objectio" gomock "go.uber.org/mock/gomock" ) @@ -58,10 +59,10 @@ func (mr *MockStorageMockRecorder) Close() *gomock.Call { } // Create mocks base method. -func (m *MockStorage) Create(arg0 context.Context, arg1 string, arg2 *objstore.WriterOption) (objstore.FileWriter, error) { +func (m *MockStorage) Create(arg0 context.Context, arg1 string, arg2 *objstore.WriterOption) (objectio.Writer, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Create", arg0, arg1, arg2) - ret0, _ := ret[0].(objstore.FileWriter) + ret0, _ := ret[0].(objectio.Writer) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -116,10 +117,10 @@ func (mr *MockStorageMockRecorder) FileExists(arg0, arg1 any) *gomock.Call { } // Open mocks base method. -func (m *MockStorage) Open(arg0 context.Context, arg1 string, arg2 *objstore.ReaderOption) (objstore.FileReader, error) { +func (m *MockStorage) Open(arg0 context.Context, arg1 string, arg2 *objstore.ReaderOption) (objectio.Reader, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Open", arg0, arg1, arg2) - ret0, _ := ret[0].(objstore.FileReader) + ret0, _ := ret[0].(objectio.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/objstore/noop.go b/pkg/objstore/noop.go index 2acef3efa7..ead5250337 100644 --- a/pkg/objstore/noop.go +++ b/pkg/objstore/noop.go @@ -16,6 +16,8 @@ package objstore import ( "context" + + "github.com/pingcap/tidb/pkg/objstore/objectio" ) type noopStorage struct{} @@ -46,7 +48,7 @@ func (*noopStorage) FileExists(_ context.Context, _ string) (bool, error) { } // Open a Reader by file path. -func (*noopStorage) Open(_ context.Context, _ string, _ *ReaderOption) (FileReader, error) { +func (*noopStorage) Open(_ context.Context, _ string, _ *ReaderOption) (objectio.Reader, error) { return noopReader{}, nil } @@ -60,7 +62,7 @@ func (*noopStorage) URI() string { } // Create implements Storage interface. -func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (FileWriter, error) { +func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (objectio.Writer, error) { return &NoopWriter{}, nil } @@ -97,12 +99,12 @@ func (noopReader) GetFileSize() (int64, error) { // NoopWriter is a writer that does nothing. type NoopWriter struct{} -// Write implements FileWriter interface. +// Write implements objectio.Writer interface. func (NoopWriter) Write(_ context.Context, p []byte) (int, error) { return len(p), nil } -// Close implements FileWriter interface. +// Close implements objectio.Writer interface. func (NoopWriter) Close(_ context.Context) error { return nil } diff --git a/pkg/objstore/objectio/BUILD.bazel b/pkg/objstore/objectio/BUILD.bazel new file mode 100644 index 0000000000..9aeaaaa506 --- /dev/null +++ b/pkg/objstore/objectio/BUILD.bazel @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "objectio", + srcs = [ + "interface.go", + "writer.go", + ], + importpath = "github.com/pingcap/tidb/pkg/objstore/objectio", + visibility = ["//visibility:public"], + deps = [ + "//pkg/objstore/compressedio", + "//pkg/objstore/recording", + "@com_github_pingcap_errors//:errors", + ], +) + +go_test( + name = "objectio_test", + timeout = "short", + srcs = ["writer_test.go"], + flaky = True, + shard_count = 3, + deps = [ + "//pkg/objstore", + "//pkg/objstore/compressedio", + "@com_github_klauspost_compress//zstd", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/objstore/objectio/interface.go b/pkg/objstore/objectio/interface.go new file mode 100644 index 0000000000..8d906f1075 --- /dev/null +++ b/pkg/objstore/objectio/interface.go @@ -0,0 +1,35 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectio + +import ( + "context" + "io" +) + +// Reader represents the streaming external file reader. +type Reader interface { + io.ReadSeekCloser + // GetFileSize returns the file size. + GetFileSize() (int64, error) +} + +// Writer represents the streaming external file writer. +type Writer interface { + // Write writes to buffer and if chunk is filled will upload it + Write(ctx context.Context, p []byte) (int, error) + // Close writes final chunk and completes the upload + Close(ctx context.Context) error +} diff --git a/pkg/objstore/objectio/writer.go b/pkg/objstore/objectio/writer.go new file mode 100644 index 0000000000..0e9f697a04 --- /dev/null +++ b/pkg/objstore/objectio/writer.go @@ -0,0 +1,155 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectio + +import ( + "bytes" + "context" + "io" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/objstore/compressedio" + "github.com/pingcap/tidb/pkg/objstore/recording" +) + +// EmptyFlusher empty Flusher. +type EmptyFlusher struct{} + +// Flush do flush. +func (*EmptyFlusher) Flush() error { + return nil +} + +type interceptBuffer interface { + io.WriteCloser + compressedio.Flusher + Len() int + Cap() int + Bytes() []byte + Reset() + Compressed() bool +} + +func newInterceptBuffer(chunkSize int, compressType compressedio.CompressType) interceptBuffer { + if compressType == compressedio.NoCompression { + return newPlainBuffer(chunkSize) + } + return compressedio.NewBuffer(chunkSize, compressType) +} + +type plainBuffer struct { + *bytes.Buffer +} + +func (*plainBuffer) Flush() error { + return nil +} + +func (*plainBuffer) Close() error { + return nil +} + +func (*plainBuffer) Compressed() bool { + return false +} + +func newPlainBuffer(chunkSize int) *plainBuffer { + return &plainBuffer{bytes.NewBuffer(make([]byte, 0, chunkSize))} +} + +// BufferedWriter is a buffered writer +type BufferedWriter struct { + buf interceptBuffer + writer Writer + accessRec *recording.AccessStats +} + +// Write implements Writer. +func (u *BufferedWriter) Write(ctx context.Context, p []byte) (int, error) { + n, err := u.write0(ctx, p) + u.accessRec.RecWrite(n) + return n, errors.Trace(err) +} + +func (u *BufferedWriter) write0(ctx context.Context, p []byte) (int, error) { + bytesWritten := 0 + for u.buf.Len()+len(p) > u.buf.Cap() { + // We won't fit p in this chunk + + // Is this chunk full? + chunkToFill := u.buf.Cap() - u.buf.Len() + if chunkToFill > 0 { + // It's not full so we write enough of p to fill it + prewrite := p[0:chunkToFill] + w, err := u.buf.Write(prewrite) + bytesWritten += w + if err != nil { + return bytesWritten, errors.Trace(err) + } + p = p[w:] + // continue buf because compressed data size may be less than Cap - Len + if u.buf.Compressed() { + continue + } + } + _ = u.buf.Flush() + err := u.uploadChunk(ctx) + if err != nil { + return 0, errors.Trace(err) + } + } + w, err := u.buf.Write(p) + bytesWritten += w + return bytesWritten, errors.Trace(err) +} + +func (u *BufferedWriter) uploadChunk(ctx context.Context) error { + if u.buf.Len() == 0 { + return nil + } + b := u.buf.Bytes() + u.buf.Reset() + _, err := u.writer.Write(ctx, b) + return errors.Trace(err) +} + +// Close implements Writer. +func (u *BufferedWriter) Close(ctx context.Context) error { + u.buf.Close() + err := u.uploadChunk(ctx) + if err != nil { + return errors.Trace(err) + } + return u.writer.Close(ctx) +} + +// GetWriter get the underlying writer. +func (u *BufferedWriter) GetWriter() Writer { + return u.writer +} + +// NewUploaderWriter wraps the Writer interface over an uploader. +func NewUploaderWriter(writer Writer, chunkSize int, compressType compressedio.CompressType) Writer { + return NewBufferedWriter(writer, chunkSize, compressType, nil) +} + +// NewBufferedWriter is used to build a buffered writer. +func NewBufferedWriter(writer Writer, chunkSize int, compressType compressedio.CompressType, accessRec *recording.AccessStats) *BufferedWriter { + return &BufferedWriter{ + writer: writer, + buf: newInterceptBuffer(chunkSize, compressType), + accessRec: accessRec, + } +} diff --git a/pkg/objstore/writer_test.go b/pkg/objstore/objectio/writer_test.go similarity index 69% rename from pkg/objstore/writer_test.go rename to pkg/objstore/objectio/writer_test.go index 01d402460d..b749641fa9 100644 --- a/pkg/objstore/writer_test.go +++ b/pkg/objstore/objectio/writer_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objstore +package objectio_test import ( "bytes" @@ -25,9 +25,36 @@ import ( "testing" "github.com/klauspost/compress/zstd" + "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/stretchr/testify/require" ) +func getStore(t *testing.T, uri string, changeStoreFn func(s objstore.Storage) objstore.Storage) objstore.Storage { + t.Helper() + backend, err := objstore.ParseBackend(uri, nil) + require.NoError(t, err) + ctx := context.Background() + storage, err := objstore.Create(ctx, backend, true) + require.NoError(t, err) + return changeStoreFn(storage) +} + +func writeFile(t *testing.T, storage objstore.Storage, fileName string, lines []string) { + t.Helper() + ctx := context.Background() + writer, err := storage.Create(ctx, fileName, nil) + require.NoError(t, err) + for _, str := range lines { + p := []byte(str) + written, err2 := writer.Write(ctx, p) + require.Nil(t, err2) + require.Len(t, p, written) + } + err = writer.Close(ctx) + require.NoError(t, err) +} + func TestExternalFileWriter(t *testing.T) { dir := t.TempDir() @@ -37,22 +64,11 @@ func TestExternalFileWriter(t *testing.T) { } testFn := func(test *testcase, t *testing.T) { t.Log(test.name) - backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil) - require.NoError(t, err) - ctx := context.Background() - storage, err := Create(ctx, backend, true) - require.NoError(t, err) + storage := getStore(t, "local://"+filepath.ToSlash(dir), func(s objstore.Storage) objstore.Storage { + return s + }) fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt" - writer, err := storage.Create(ctx, fileName, nil) - require.NoError(t, err) - for _, str := range test.content { - p := []byte(str) - written, err2 := writer.Write(ctx, p) - require.Nil(t, err2) - require.Len(t, p, written) - } - err = writer.Close(ctx) - require.NoError(t, err) + writeFile(t, storage, fileName, test.content) content, err := os.ReadFile(filepath.Join(dir, fileName)) require.NoError(t, err) require.Equal(t, strings.Join(test.content, ""), string(content)) @@ -107,33 +123,21 @@ func TestCompressReaderWriter(t *testing.T) { type testcase struct { name string content []string - compressType CompressType + compressType compressedio.CompressType } testFn := func(test *testcase, t *testing.T) { t.Log(test.name) - backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil) - require.NoError(t, err) - ctx := context.Background() - storage, err := Create(ctx, backend, true) - require.NoError(t, err) - storage = WithCompression(storage, test.compressType, DecompressConfig{}) - suffix := createSuffixString(test.compressType) + suffix := test.compressType.FileSuffix() fileName := strings.ReplaceAll(test.name, " ", "-") + suffix - writer, err := storage.Create(ctx, fileName, nil) - require.NoError(t, err) - for _, str := range test.content { - p := []byte(str) - written, err2 := writer.Write(ctx, p) - require.Nil(t, err2) - require.Len(t, p, written) - } - err = writer.Close(ctx) - require.NoError(t, err) + storage := getStore(t, "local://"+filepath.ToSlash(dir), func(s objstore.Storage) objstore.Storage { + return objstore.WithCompression(s, test.compressType, compressedio.DecompressConfig{}) + }) + writeFile(t, storage, fileName, test.content) // make sure compressed file is written correctly file, err := os.Open(filepath.Join(dir, fileName)) require.NoError(t, err) - r, err := newCompressReader(test.compressType, DecompressConfig{}, file) + r, err := compressedio.NewReader(test.compressType, compressedio.DecompressConfig{}, file) require.NoError(t, err) var bf bytes.Buffer _, err = bf.ReadFrom(r) @@ -141,6 +145,7 @@ func TestCompressReaderWriter(t *testing.T) { require.Equal(t, strings.Join(test.content, ""), bf.String()) // test withCompression Open + ctx := context.Background() r, err = storage.Open(ctx, fileName, nil) require.NoError(t, err) content, err := io.ReadAll(r) @@ -149,7 +154,7 @@ func TestCompressReaderWriter(t *testing.T) { require.Nil(t, file.Close()) } - compressTypeArr := []CompressType{Gzip, Snappy, Zstd} + compressTypeArr := []compressedio.CompressType{compressedio.Gzip, compressedio.Snappy, compressedio.Zstd} tests := []testcase{ { @@ -196,7 +201,7 @@ func TestNewCompressReader(t *testing.T) { // default cfg(decode asynchronously) prevRoutineCnt := runtime.NumGoroutine() - r, err := newCompressReader(Zstd, DecompressConfig{}, bytes.NewReader(compressedData)) + r, err := compressedio.NewReader(compressedio.Zstd, compressedio.DecompressConfig{}, bytes.NewReader(compressedData)) currRoutineCnt := runtime.NumGoroutine() require.NoError(t, err) require.Greater(t, currRoutineCnt, prevRoutineCnt) @@ -206,8 +211,8 @@ func TestNewCompressReader(t *testing.T) { // sync decode prevRoutineCnt = runtime.NumGoroutine() - config := DecompressConfig{ZStdDecodeConcurrency: 1} - r, err = newCompressReader(Zstd, config, bytes.NewReader(compressedData)) + config := compressedio.DecompressConfig{ZStdDecodeConcurrency: 1} + r, err = compressedio.NewReader(compressedio.Zstd, config, bytes.NewReader(compressedData)) require.NoError(t, err) currRoutineCnt = runtime.NumGoroutine() require.Equal(t, prevRoutineCnt, currRoutineCnt) diff --git a/pkg/objstore/s3.go b/pkg/objstore/s3.go index 3ca4713e34..9d6e73446c 100644 --- a/pkg/objstore/s3.go +++ b/pkg/objstore/s3.go @@ -52,6 +52,8 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/objstore/compressedio" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/objstore/recording" "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/prefetch" @@ -1024,7 +1026,7 @@ func (rs *S3Storage) URI() string { } // Open a Reader by file path. -func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) { +func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) { start := int64(0) end := int64(0) prefetchSize := 0 @@ -1318,7 +1320,7 @@ func (r *s3ObjectReader) GetFileSize() (int64, error) { } // createUploader create multi upload request. -func (rs *S3Storage) createUploader(ctx context.Context, name string) (FileWriter, error) { +func (rs *S3Storage) createUploader(ctx context.Context, name string) (objectio.Writer, error) { input := &s3.CreateMultipartUploadInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + name), @@ -1369,8 +1371,8 @@ func (s *s3ObjectWriter) Close(_ context.Context) error { } // Create creates multi upload request. -func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (FileWriter, error) { - var uploader FileWriter +func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (objectio.Writer, error) { + var uploader objectio.Writer var err error if option == nil || option.Concurrency <= 1 { uploader, err = rs.createUploader(ctx, name) @@ -1407,7 +1409,7 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti if option != nil && option.PartSize > 0 { bufSize = int(option.PartSize) } - uploaderWriter := newBufferedWriter(uploader, bufSize, NoCompression, rs.accessRec) + uploaderWriter := objectio.NewBufferedWriter(uploader, bufSize, compressedio.NoCompression, rs.accessRec) return uploaderWriter, nil } diff --git a/pkg/objstore/storage.go b/pkg/objstore/storage.go index bd34a3814a..a8b124b907 100644 --- a/pkg/objstore/storage.go +++ b/pkg/objstore/storage.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/objstore/recording" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -106,14 +107,6 @@ type Uploader interface { CompleteUpload(ctx context.Context) error } -// Writer is like io.Writer but with Context, create a new writer on top of Uploader with NewUploaderWriter. -type Writer interface { - // Write writes to buffer and if chunk is filled will upload it - Write(ctx context.Context, p []byte) (int, error) - // Close writes final chunk and completes the upload - Close(ctx context.Context) error -} - // WriterOption writer option. type WriterOption struct { Concurrency int @@ -154,7 +147,7 @@ type Storage interface { DeleteFile(ctx context.Context, name string) error // Open a Reader by file path. path is relative path to storage base path. // Some implementation will use the given ctx as the inner context of the reader. - Open(ctx context.Context, path string, option *ReaderOption) (FileReader, error) + Open(ctx context.Context, path string, option *ReaderOption) (objectio.Reader, error) // DeleteFiles delete the files in storage DeleteFiles(ctx context.Context, names []string) error // WalkDir traverse all the files in a dir. @@ -171,28 +164,13 @@ type Storage interface { // Create opens a file writer by path. path is relative path to storage base // path. The old file under same path will be overwritten. Currently only s3 // implemented WriterOption. - Create(ctx context.Context, path string, option *WriterOption) (FileWriter, error) + Create(ctx context.Context, path string, option *WriterOption) (objectio.Writer, error) // Rename file name from oldFileName to newFileName Rename(ctx context.Context, oldFileName, newFileName string) error // Close release the resources of the storage. Close() } -// FileReader represents the streaming external file reader. -type FileReader interface { - io.ReadSeekCloser - // GetFileSize returns the file size. - GetFileSize() (int64, error) -} - -// FileWriter represents the streaming external file writer. -type FileWriter interface { - // Write writes to buffer and if chunk is filled will upload it - Write(ctx context.Context, p []byte) (int, error) - // Close writes final chunk and completes the upload - Close(ctx context.Context) error -} - // Options are backend-independent options provided to New. type Options struct { // SendCredentials marks whether to send credentials downstream. diff --git a/pkg/objstore/writer.go b/pkg/objstore/writer.go deleted file mode 100644 index ebda1d6ba3..0000000000 --- a/pkg/objstore/writer.go +++ /dev/null @@ -1,308 +0,0 @@ -// Copyright 2026 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package objstore - -import ( - "bytes" - "context" - "io" - - "github.com/klauspost/compress/gzip" - "github.com/klauspost/compress/snappy" - "github.com/klauspost/compress/zstd" - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/pkg/objstore/recording" - "go.uber.org/zap" -) - -// CompressType represents the type of compression. -type CompressType uint8 - -const ( - // NoCompression won't compress given bytes. - NoCompression CompressType = iota - // Gzip will compress given bytes in gzip format. - Gzip - // Snappy will compress given bytes in snappy format. - Snappy - // Zstd will compress given bytes in zstd format. - Zstd -) - -// DecompressConfig is the config used for decompression. -type DecompressConfig struct { - // ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency. - // if not 1, ZStd will decode file asynchronously. - ZStdDecodeConcurrency int -} - -type flusher interface { - Flush() error -} - -type emptyFlusher struct{} - -func (*emptyFlusher) Flush() error { - return nil -} - -type interceptBuffer interface { - io.WriteCloser - flusher - Len() int - Cap() int - Bytes() []byte - Reset() - Compressed() bool -} - -func createSuffixString(compressType CompressType) string { - txtSuffix := ".txt" - switch compressType { - case Gzip: - txtSuffix += ".gz" - case Snappy: - txtSuffix += ".snappy" - case Zstd: - txtSuffix += ".zst" - default: - return "" - } - return txtSuffix -} - -func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer { - if compressType == NoCompression { - return newNoCompressionBuffer(chunkSize) - } - return newSimpleCompressBuffer(chunkSize, compressType) -} - -func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWriter { - switch compressType { - case Gzip: - return gzip.NewWriter(w) - case Snappy: - return snappy.NewBufferedWriter(w) - case Zstd: - newWriter, err := zstd.NewWriter(w) - if err != nil { - log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err)) - } - return newWriter - default: - return nil - } -} - -func newCompressReader(compressType CompressType, cfg DecompressConfig, r io.Reader) (io.Reader, error) { - switch compressType { - case Gzip: - return gzip.NewReader(r) - case Snappy: - return snappy.NewReader(r), nil - case Zstd: - options := []zstd.DOption{} - if cfg.ZStdDecodeConcurrency > 0 { - options = append(options, zstd.WithDecoderConcurrency(cfg.ZStdDecodeConcurrency)) - } - return zstd.NewReader(r, options...) - default: - return nil, nil - } -} - -type noCompressionBuffer struct { - *bytes.Buffer -} - -func (*noCompressionBuffer) Flush() error { - return nil -} - -func (*noCompressionBuffer) Close() error { - return nil -} - -func (*noCompressionBuffer) Compressed() bool { - return false -} - -func newNoCompressionBuffer(chunkSize int) *noCompressionBuffer { - return &noCompressionBuffer{bytes.NewBuffer(make([]byte, 0, chunkSize))} -} - -type simpleCompressWriter interface { - io.WriteCloser - flusher -} - -type simpleCompressBuffer struct { - *bytes.Buffer - compressWriter simpleCompressWriter - cap int -} - -func (b *simpleCompressBuffer) Write(p []byte) (int, error) { - written, err := b.compressWriter.Write(p) - return written, errors.Trace(err) -} - -func (b *simpleCompressBuffer) Len() int { - return b.Buffer.Len() -} - -func (b *simpleCompressBuffer) Cap() int { - return b.cap -} - -func (b *simpleCompressBuffer) Reset() { - b.Buffer.Reset() -} - -func (b *simpleCompressBuffer) Flush() error { - return b.compressWriter.Flush() -} - -func (b *simpleCompressBuffer) Close() error { - return b.compressWriter.Close() -} - -func (*simpleCompressBuffer) Compressed() bool { - return true -} - -func newSimpleCompressBuffer(chunkSize int, compressType CompressType) *simpleCompressBuffer { - bf := bytes.NewBuffer(make([]byte, 0, chunkSize)) - return &simpleCompressBuffer{ - Buffer: bf, - cap: chunkSize, - compressWriter: newCompressWriter(compressType, bf), - } -} - -type bufferedWriter struct { - buf interceptBuffer - writer FileWriter - accessRec *recording.AccessStats -} - -func (u *bufferedWriter) Write(ctx context.Context, p []byte) (int, error) { - n, err := u.write0(ctx, p) - u.accessRec.RecWrite(n) - return n, errors.Trace(err) -} - -func (u *bufferedWriter) write0(ctx context.Context, p []byte) (int, error) { - bytesWritten := 0 - for u.buf.Len()+len(p) > u.buf.Cap() { - // We won't fit p in this chunk - - // Is this chunk full? - chunkToFill := u.buf.Cap() - u.buf.Len() - if chunkToFill > 0 { - // It's not full so we write enough of p to fill it - prewrite := p[0:chunkToFill] - w, err := u.buf.Write(prewrite) - bytesWritten += w - if err != nil { - return bytesWritten, errors.Trace(err) - } - p = p[w:] - // continue buf because compressed data size may be less than Cap - Len - if u.buf.Compressed() { - continue - } - } - _ = u.buf.Flush() - err := u.uploadChunk(ctx) - if err != nil { - return 0, errors.Trace(err) - } - } - w, err := u.buf.Write(p) - bytesWritten += w - return bytesWritten, errors.Trace(err) -} - -func (u *bufferedWriter) uploadChunk(ctx context.Context) error { - if u.buf.Len() == 0 { - return nil - } - b := u.buf.Bytes() - u.buf.Reset() - _, err := u.writer.Write(ctx, b) - return errors.Trace(err) -} - -func (u *bufferedWriter) Close(ctx context.Context) error { - u.buf.Close() - err := u.uploadChunk(ctx) - if err != nil { - return errors.Trace(err) - } - return u.writer.Close(ctx) -} - -// NewUploaderWriter wraps the Writer interface over an uploader. -func NewUploaderWriter(writer FileWriter, chunkSize int, compressType CompressType) FileWriter { - return newBufferedWriter(writer, chunkSize, compressType, nil) -} - -// newBufferedWriter is used to build a buffered writer. -func newBufferedWriter(writer FileWriter, chunkSize int, compressType CompressType, accessRec *recording.AccessStats) *bufferedWriter { - return &bufferedWriter{ - writer: writer, - buf: newInterceptBuffer(chunkSize, compressType), - accessRec: accessRec, - } -} - -// BytesWriter is a Writer implementation on top of bytes.Buffer that is useful for testing. -type BytesWriter struct { - buf *bytes.Buffer -} - -// Write delegates to bytes.Buffer. -func (u *BytesWriter) Write(_ context.Context, p []byte) (int, error) { - return u.buf.Write(p) -} - -// Close delegates to bytes.Buffer. -func (*BytesWriter) Close(_ context.Context) error { - // noop - return nil -} - -// Bytes delegates to bytes.Buffer. -func (u *BytesWriter) Bytes() []byte { - return u.buf.Bytes() -} - -// String delegates to bytes.Buffer. -func (u *BytesWriter) String() string { - return u.buf.String() -} - -// Reset delegates to bytes.Buffer. -func (u *BytesWriter) Reset() { - u.buf.Reset() -} - -// NewBufferWriter creates a Writer that simply writes to a buffer (useful for testing). -func NewBufferWriter() *BytesWriter { - return &BytesWriter{buf: &bytes.Buffer{}} -}