objstore: encapsulate i/o related logic into separate pkg and refactor (#65495)

ref pingcap/tidb#65461
This commit is contained in:
D3Hunter
2026-01-09 22:22:32 +08:00
committed by GitHub
parent 920ee6e011
commit e0df5fdab0
58 changed files with 767 additions and 555 deletions

View File

@ -37,6 +37,8 @@ go_library(
"//pkg/infoschema/context",
"//pkg/meta/model",
"//pkg/objstore",
"//pkg/objstore/compressedio",
"//pkg/objstore/objectio",
"//pkg/parser",
"//pkg/parser/ast",
"//pkg/parser/format",

View File

@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/promutil"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
@ -134,7 +135,7 @@ type Config struct {
EscapeBackslash bool
DumpEmptyDatabase bool
PosAfterConnect bool
CompressType objstore.CompressType
CompressType compressedio.CompressType
Host string
Port int
@ -586,7 +587,7 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
conf.CompressType, err = ParseCompressType(compressType)
conf.CompressType, err = compressedio.ParseCompressType(compressType)
if err != nil {
return errors.Trace(err)
}
@ -671,22 +672,6 @@ func GetConfTables(tablesList []string) (DatabaseTables, error) {
return dbTables, nil
}
// ParseCompressType parses compressType string to storage.CompressType
func ParseCompressType(compressType string) (objstore.CompressType, error) {
switch compressType {
case "", "no-compression":
return objstore.NoCompression, nil
case "gzip", "gz":
return objstore.Gzip, nil
case "snappy":
return objstore.Snappy, nil
case "zstd", "zst":
return objstore.Zstd, nil
default:
return objstore.NoCompression, errors.Errorf("unknown compress type %s", compressType)
}
}
// ParseOutputDialect parses output dialect string to Dialect
func ParseOutputDialect(outputDialect string) (CSVDialect, error) {
switch outputDialect {

View File

@ -14,6 +14,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"go.uber.org/zap"
)
@ -227,7 +228,7 @@ func recordGlobalMetaData(tctx *tcontext.Context, db *sql.Conn, buffer *bytes.Bu
func (m *globalMetadata) writeGlobalMetaData() error {
// keep consistent with mydumper. Never compress metadata
fileWriter, tearDown, err := buildFileWriter(m.tctx, m.storage, metadataPath, objstore.NoCompression)
fileWriter, tearDown, err := buildFileWriter(m.tctx, m.storage, metadataPath, compressedio.NoCompression)
if err != nil {
return err
}

View File

@ -3,6 +3,8 @@
package export
import (
"bytes"
"context"
"database/sql/driver"
"fmt"
"strings"
@ -10,18 +12,53 @@ import (
"github.com/pingcap/errors"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/util/promutil"
"github.com/stretchr/testify/require"
)
// BytesWriter is a Writer implementation on top of bytes.Buffer that is useful for testing.
type BytesWriter struct {
buf *bytes.Buffer
}
// Write delegates to bytes.Buffer.
func (u *BytesWriter) Write(_ context.Context, p []byte) (int, error) {
return u.buf.Write(p)
}
// Close delegates to bytes.Buffer.
func (*BytesWriter) Close(_ context.Context) error {
// noop
return nil
}
// Bytes delegates to bytes.Buffer.
func (u *BytesWriter) Bytes() []byte {
return u.buf.Bytes()
}
// String delegates to bytes.Buffer.
func (u *BytesWriter) String() string {
return u.buf.String()
}
// Reset delegates to bytes.Buffer.
func (u *BytesWriter) Reset() {
u.buf.Reset()
}
// NewBufferWriter creates a Writer that simply writes to a buffer (useful for testing).
func NewBufferWriter() *BytesWriter {
return &BytesWriter{buf: &bytes.Buffer{}}
}
func TestWriteMeta(t *testing.T) {
createTableStmt := "CREATE TABLE `t1` (\n" +
" `a` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;\n"
specCmts := []string{"/*!40103 SET TIME_ZONE='+00:00' */;"}
meta := newMockMetaIR("t1", createTableStmt, specCmts)
writer := objstore.NewBufferWriter()
writer := NewBufferWriter()
err := WriteMeta(tcontext.Background(), meta, writer)
require.NoError(t, err)
@ -48,7 +85,7 @@ func TestWriteInsert(t *testing.T) {
"/*!40014 SET FOREIGN_KEY_CHECKS=0*/;",
}
tableIR := newMockTableIR("test", "employee", data, specCmts, colTypes)
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize)
m := newMetrics(conf.PromFactory, conf.Labels)
@ -86,7 +123,7 @@ func TestWriteInsertReturnsError(t *testing.T) {
rowErr := errors.New("mock row error")
tableIR := newMockTableIR("test", "employee", data, specCmts, colTypes)
tableIR.rowErr = rowErr
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize)
m := newMetrics(conf.PromFactory, conf.Labels)
@ -117,7 +154,7 @@ func TestWriteInsertInCsv(t *testing.T) {
}
colTypes := []string{"INT", "SET", "VARCHAR", "VARCHAR", "TEXT"}
tableIR := newMockTableIR("test", "employee", data, nil, colTypes)
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
// test nullValue
opt := &csvOption{separator: []byte(","), delimiter: []byte{'"'}, nullValue: "\\N", lineTerminator: []byte("\r\n")}
@ -227,7 +264,7 @@ func TestWriteInsertInCsvReturnsError(t *testing.T) {
rowErr := errors.New("mock row error")
tableIR := newMockTableIR("test", "employee", data, nil, colTypes)
tableIR.rowErr = rowErr
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
// test nullValue
opt := &csvOption{separator: []byte(","), delimiter: []byte{'"'}, nullValue: "\\N", lineTerminator: []byte("\r\n")}
@ -263,7 +300,7 @@ func TestWriteInsertInCsvWithDialect(t *testing.T) {
conf.CsvOutputDialect = CSVDialectDefault
tableIR := newMockTableIR("test", "employee", data, nil, colTypes)
m := newMetrics(conf.PromFactory, conf.Labels)
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, m)
require.NoError(t, err)
require.Equal(t, uint64(4), n)
@ -281,7 +318,7 @@ func TestWriteInsertInCsvWithDialect(t *testing.T) {
conf.CsvOutputDialect = CSVDialectRedshift
tableIR := newMockTableIR("test", "employee", data, nil, colTypes)
m := newMetrics(conf.PromFactory, conf.Labels)
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, m)
require.NoError(t, err)
require.Equal(t, uint64(4), n)
@ -299,7 +336,7 @@ func TestWriteInsertInCsvWithDialect(t *testing.T) {
conf.CsvOutputDialect = CSVDialectBigQuery
tableIR := newMockTableIR("test", "employee", data, nil, colTypes)
m := newMetrics(conf.PromFactory, conf.Labels)
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, m)
require.NoError(t, err)
require.Equal(t, uint64(4), n)
@ -329,7 +366,7 @@ func TestSQLDataTypes(t *testing.T) {
tableData := [][]driver.Value{{origin}}
colType := []string{sqlType}
tableIR := newMockTableIR("test", "t", tableData, nil, colType)
bf := objstore.NewBufferWriter()
bf := NewBufferWriter()
conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize)
m := newMetrics(conf.PromFactory, conf.Labels)

View File

@ -17,6 +17,8 @@ import (
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/dumpling/log"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -41,11 +43,11 @@ type writerPipe struct {
fileSizeLimit uint64
statementSizeLimit uint64
w objstore.FileWriter
w objectio.Writer
}
func newWriterPipe(
w objstore.FileWriter,
w objectio.Writer,
fileSizeLimit,
statementSizeLimit uint64,
metrics *metrics,
@ -121,8 +123,8 @@ func (b *writerPipe) ShouldSwitchStatement() bool {
(b.statementSizeLimit != UnspecifiedSize && b.currentStatementSize >= b.statementSizeLimit)
}
// WriteMeta writes MetaIR to a storage.FileWriter
func WriteMeta(tctx *tcontext.Context, meta MetaIR, w objstore.FileWriter) error {
// WriteMeta writes MetaIR to an objectio.Writer
func WriteMeta(tctx *tcontext.Context, meta MetaIR, w objectio.Writer) error {
tctx.L().Debug("start dumping meta data", zap.String("target", meta.TargetName()))
specCmtIter := meta.SpecialComments()
@ -140,13 +142,13 @@ func WriteMeta(tctx *tcontext.Context, meta MetaIR, w objstore.FileWriter) error
return nil
}
// WriteInsert writes TableDataIR to a storage.FileWriter in sql type
// WriteInsert writes TableDataIR to an objectio.Writer in SQL type
func WriteInsert(
pCtx *tcontext.Context,
cfg *Config,
meta TableMeta,
tblIR TableDataIR,
w objstore.FileWriter,
w objectio.Writer,
metrics *metrics,
) (n uint64, err error) {
fileRowIter := tblIR.Rows()
@ -288,13 +290,13 @@ func WriteInsert(
return counter, wp.Error()
}
// WriteInsertInCsv writes TableDataIR to a storage.FileWriter in csv type
// WriteInsertInCsv writes TableDataIR to objectio.Writer in csv type
func WriteInsertInCsv(
pCtx *tcontext.Context,
cfg *Config,
meta TableMeta,
tblIR TableDataIR,
w objstore.FileWriter,
w objectio.Writer,
metrics *metrics,
) (n uint64, err error) {
fileRowIter := tblIR.Rows()
@ -417,7 +419,7 @@ func WriteInsertInCsv(
return counter, wp.Error()
}
func write(tctx *tcontext.Context, writer objstore.FileWriter, str string) error {
func write(tctx *tcontext.Context, writer objectio.Writer, str string) error {
_, err := writer.Write(tctx, []byte(str))
if err != nil {
// str might be very long, only output the first 200 chars
@ -429,7 +431,7 @@ func write(tctx *tcontext.Context, writer objstore.FileWriter, str string) error
return errors.Trace(err)
}
func writeBytes(tctx *tcontext.Context, writer objstore.FileWriter, p []byte) error {
func writeBytes(tctx *tcontext.Context, writer objectio.Writer, p []byte) error {
_, err := writer.Write(tctx, p)
if err != nil {
// str might be very long, only output the first 200 chars
@ -444,10 +446,10 @@ func writeBytes(tctx *tcontext.Context, writer objstore.FileWriter, p []byte) er
return errors.Trace(err)
}
func buildFileWriter(tctx *tcontext.Context, s objstore.Storage, fileName string, compressType objstore.CompressType) (objstore.FileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
func buildFileWriter(tctx *tcontext.Context, s objstore.Storage, fileName string, compressType compressedio.CompressType) (objectio.Writer, func(ctx context.Context) error, error) {
fileName += compressType.FileSuffix()
fullPath := s.URI() + "/" + fileName
writer, err := objstore.WithCompression(s, compressType, objstore.DecompressConfig{}).Create(tctx, fileName, nil)
writer, err := objstore.WithCompression(s, compressType, compressedio.DecompressConfig{}).Create(tctx, fileName, nil)
if err != nil {
tctx.L().Warn("fail to open file",
zap.String("path", fullPath),
@ -472,15 +474,15 @@ func buildFileWriter(tctx *tcontext.Context, s objstore.Storage, fileName string
return writer, tearDownRoutine, nil
}
func buildInterceptFileWriter(pCtx *tcontext.Context, s objstore.Storage, fileName string, compressType objstore.CompressType) (objstore.FileWriter, func(context.Context) error) {
fileName += compressFileSuffix(compressType)
var writer objstore.FileWriter
func buildInterceptFileWriter(pCtx *tcontext.Context, s objstore.Storage, fileName string, compressType compressedio.CompressType) (objectio.Writer, func(context.Context) error) {
fileName += compressType.FileSuffix()
var writer objectio.Writer
fullPath := s.URI() + "/" + fileName
fileWriter := &InterceptFileWriter{}
initRoutine := func() error {
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
// which will cause a context canceled error when closing gcs's Writer
w, err := objstore.WithCompression(s, compressType, objstore.DecompressConfig{}).Create(pCtx, fileName, nil)
w, err := objstore.WithCompression(s, compressType, compressedio.DecompressConfig{}).Create(pCtx, fileName, nil)
if err != nil {
pCtx.L().Warn("fail to open file",
zap.String("path", fullPath),
@ -489,7 +491,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s objstore.Storage, fileNa
}
writer = w
pCtx.L().Debug("opened file", zap.String("path", fullPath))
fileWriter.FileWriter = writer
fileWriter.Writer = writer
return nil
}
fileWriter.initRoutine = initRoutine
@ -549,7 +551,7 @@ func newWriterError(err error) error {
// InterceptFileWriter is an interceptor of os.File,
// tracking whether a StringWriter has written something.
type InterceptFileWriter struct {
objstore.FileWriter
objectio.Writer
sync.Once
SomethingIsWritten bool
@ -557,7 +559,7 @@ type InterceptFileWriter struct {
err error
}
// Write implements storage.FileWriter.Write. It check whether writer has written something and init a file at first time
// Write implements objectio.Writer. It checks whether writer has written something and init a file at first time
func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error) {
w.Do(func() { w.err = w.initRoutine() })
if len(p) > 0 {
@ -566,13 +568,13 @@ func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error)
if w.err != nil {
return 0, errors.Annotate(w.err, "open file error")
}
n, err := w.FileWriter.Write(ctx, p)
n, err := w.Writer.Write(ctx, p)
return n, newWriterError(err)
}
// Close closes the InterceptFileWriter
func (w *InterceptFileWriter) Close(ctx context.Context) error {
return w.FileWriter.Close(ctx)
return w.Writer.Close(ctx)
}
func wrapBackTicks(identifier string) string {
@ -586,21 +588,6 @@ func wrapStringWith(str string, wrapper string) string {
return fmt.Sprintf("%s%s%s", wrapper, str, wrapper)
}
func compressFileSuffix(compressType objstore.CompressType) string {
switch compressType {
case objstore.NoCompression:
return ""
case objstore.Gzip:
return ".gz"
case objstore.Snappy:
return ".snappy"
case objstore.Zstd:
return ".zst"
default:
return ""
}
}
// FileFormat is the format that output to file. Currently we support SQL text and CSV file format.
type FileFormat int32
@ -647,13 +634,13 @@ func (f FileFormat) Extension() string {
}
}
// WriteInsert writes TableDataIR to a storage.FileWriter in sql/csv type
// WriteInsert writes TableDataIR to a objectio.Writer in sql/csv type
func (f FileFormat) WriteInsert(
pCtx *tcontext.Context,
cfg *Config,
meta TableMeta,
tblIR TableDataIR,
w objstore.FileWriter,
w objectio.Writer,
metrics *metrics,
) (uint64, error) {
switch f {

View File

@ -53,6 +53,7 @@ go_library(
"//pkg/meta/metabuild",
"//pkg/meta/model",
"//pkg/objstore",
"//pkg/objstore/compressedio",
"//pkg/parser",
"//pkg/parser/ast",
"//pkg/parser/mysql",

View File

@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/worker"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
@ -84,7 +85,7 @@ func openParser(
tblInfo *model.TableInfo,
) (mydump.Parser, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, objstore.DecompressConfig{
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, compressedio.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {

View File

@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/pkg/meta/metabuild"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
_ "github.com/pingcap/tidb/pkg/planner/core" // to setup expression.EvalAstExpr. Otherwise we cannot parse the default value
@ -466,7 +467,7 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context,
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
// It implements the PreImportInfoGetter interface.
func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) {
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, objstore.DecompressConfig{
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, compressedio.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {
@ -623,7 +624,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
return resultIndexRatio, isRowOrdered, nil
}
sampleFile := tableMeta.DataFiles[0].FileMeta
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, objstore.DecompressConfig{
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, compressedio.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/lightning/verification",
"//pkg/meta/model",
"//pkg/objstore",
"//pkg/objstore/objectio",
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",

View File

@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"go.uber.org/zap"
@ -79,7 +80,7 @@ type Collector struct {
fileSeq int
currFileSize int64
writer objstore.FileWriter
writer objectio.Writer
}
// NewCollector creates a new conflicted KV info collector.

View File

@ -161,6 +161,7 @@ go_library(
"//pkg/meta/model",
"//pkg/metrics",
"//pkg/objstore",
"//pkg/objstore/compressedio",
"//pkg/parser",
"//pkg/parser/ast",
"//pkg/parser/auth",

View File

@ -42,6 +42,7 @@ go_library(
"//pkg/meta/model",
"//pkg/metrics",
"//pkg/objstore",
"//pkg/objstore/compressedio",
"//pkg/parser",
"//pkg/parser/ast",
"//pkg/parser/format",

View File

@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
pformat "github.com/pingcap/tidb/pkg/parser/format"
@ -1583,7 +1584,7 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo {
f := e.dataFiles[i]
result = append(result, LoadDataReaderInfo{
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore, objstore.DecompressConfig{})
fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore, compressedio.DecompressConfig{})
if err2 != nil {
return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err2), "Please check the INFILE path is correct")
}

View File

@ -47,7 +47,7 @@ import (
verify "github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/pingcap/tidb/pkg/meta/autoid"
tidbmetrics "github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessiontxn"
@ -318,7 +318,7 @@ func (ti *TableImporter) GetKVStore() tidbkv.Storage {
func (e *LoadDataController) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
info := LoadDataReaderInfo{
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, e.dataStore, objstore.DecompressConfig{
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, e.dataStore, compressedio.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {

View File

@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
@ -215,7 +216,7 @@ func (e *LoadDataWorker) LoadLocal(ctx context.Context, r io.ReadCloser) error {
readers := []importer.LoadDataReaderInfo{{
Opener: func(_ context.Context) (io.ReadSeekCloser, error) {
addedSeekReader := NewSimpleSeekerOnReadCloser(r)
return objstore.InterceptDecompressReader(addedSeekReader, compressTp2, objstore.DecompressConfig{
return objstore.InterceptDecompressReader(addedSeekReader, compressTp2, compressedio.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
}}}
@ -782,7 +783,7 @@ func (s *SimpleSeekerOnReadCloser) Close() error {
return s.r.Close()
}
// GetFileSize implements storage.FileReader.
// GetFileSize implements objectio.Reader.
func (*SimpleSeekerOnReadCloser) GetFileSize() (int64, error) {
return 0, errors.Errorf("unsupported GetFileSize on SimpleSeekerOnReadCloser")
}

View File

@ -36,6 +36,7 @@ go_library(
"//pkg/lightning/metric",
"//pkg/metrics",
"//pkg/objstore",
"//pkg/objstore/objectio",
"//pkg/resourcemanager/pool/workerpool",
"//pkg/resourcemanager/util",
"//pkg/util",
@ -95,6 +96,7 @@ go_test(
"//pkg/lightning/log",
"//pkg/lightning/membuf",
"//pkg/objstore",
"//pkg/objstore/objectio",
"//pkg/resourcemanager/pool/workerpool",
"//pkg/testkit/testfailpoint",
"//pkg/util",

View File

@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/size"
@ -61,7 +62,7 @@ func writePlainFile(s *writeTestSuite) {
_ = s.store.DeleteFile(ctx, filePath)
buf := make([]byte, s.memoryLimit)
offset := 0
flush := func(w objstore.FileWriter) {
flush := func(w objectio.Writer) {
n, err := w.Write(ctx, buf[:offset])
intest.AssertNoError(err)
intest.Assert(offset == n)

View File

@ -25,6 +25,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
@ -46,7 +47,7 @@ var (
// data to improve throughput.
type byteReader struct {
ctx context.Context
storageReader objstore.FileReader
storageReader objectio.Reader
// curBuf is either smallBuf or concurrentReader.largeBuf.
curBuf [][]byte
@ -78,7 +79,7 @@ func openStoreReaderAndSeek(
name string,
initFileOffset uint64,
prefetchSize int,
) (objstore.FileReader, error) {
) (objectio.Reader, error) {
storageReader, err := store.Open(ctx, name, &objstore.ReaderOption{
StartOffset: aws.Int64(int64(initFileOffset)),
PrefetchSize: prefetchSize,
@ -94,7 +95,7 @@ func openStoreReaderAndSeek(
// concurrent reading mode.
func newByteReader(
ctx context.Context,
storageReader objstore.FileReader,
storageReader objectio.Reader,
bufSize int,
) (r *byteReader, err error) {
defer func() {

View File

@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)
@ -73,7 +74,7 @@ func TestByteReader(t *testing.T) {
err := st.WriteFile(context.Background(), "testfile", []byte("abcde"))
require.NoError(t, err)
newRsc := func() objstore.FileReader {
newRsc := func() objectio.Reader {
rsc, err := st.Open(context.Background(), "testfile", nil)
require.NoError(t, err)
return rsc
@ -170,7 +171,7 @@ func TestUnexpectedEOF(t *testing.T) {
err := st.WriteFile(context.Background(), "testfile", []byte("0123456789"))
require.NoError(t, err)
newRsc := func() objstore.FileReader {
newRsc := func() objectio.Reader {
rsc, err := st.Open(context.Background(), "testfile", nil)
require.NoError(t, err)
return rsc
@ -199,7 +200,7 @@ func TestEmptyContent(t *testing.T) {
err = st.WriteFile(context.Background(), "testfile", []byte(""))
require.NoError(t, err)
newRsc := func() objstore.FileReader {
newRsc := func() objectio.Reader {
rsc, err := st.Open(context.Background(), "testfile", nil)
require.NoError(t, err)
return rsc

View File

@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
@ -177,7 +178,7 @@ type Engine struct {
recordedDupCnt int
recordedDupSize int64
dupFile string
dupWriter objstore.FileWriter
dupWriter objectio.Writer
dupKVStore *KeyValueStore
}

View File

@ -17,7 +17,7 @@ package external
import (
"context"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
)
const (
@ -29,7 +29,7 @@ const (
// KeyValueStore stores key-value pairs and maintains the range properties.
type KeyValueStore struct {
dataWriter objstore.FileWriter
dataWriter objectio.Writer
rc *rangePropertiesCollector
ctx context.Context
@ -41,7 +41,7 @@ type KeyValueStore struct {
// rangePropertiesCollector.
func NewKeyValueStore(
ctx context.Context,
dataWriter objstore.FileWriter,
dataWriter objectio.Writer,
rangePropertiesCollector *rangePropertiesCollector,
) *KeyValueStore {
kvStore := &KeyValueStore{

View File

@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
@ -37,7 +38,7 @@ type trackOpenMemStorage struct {
opened atomic.Int32
}
func (s *trackOpenMemStorage) Open(ctx context.Context, path string, _ *objstore.ReaderOption) (objstore.FileReader, error) {
func (s *trackOpenMemStorage) Open(ctx context.Context, path string, _ *objstore.ReaderOption) (objectio.Reader, error) {
s.opened.Inc()
r, err := s.MemStorage.Open(ctx, path, nil)
if err != nil {
@ -47,12 +48,12 @@ func (s *trackOpenMemStorage) Open(ctx context.Context, path string, _ *objstore
}
type trackOpenFileReader struct {
objstore.FileReader
objectio.Reader
store *trackOpenMemStorage
}
func (r *trackOpenFileReader) Close() error {
err := r.FileReader.Close()
err := r.Reader.Close()
if err != nil {
return err
}
@ -320,7 +321,7 @@ func testMergeIterSwitchMode(t *testing.T, f func([]byte, int) []byte) {
}
type eofReader struct {
objstore.FileReader
objectio.Reader
}
func (r eofReader) Seek(_ int64, _ int) (int64, error) {
@ -651,7 +652,7 @@ func (s *slowOpenStorage) Open(
ctx context.Context,
filePath string,
o *objstore.ReaderOption,
) (objstore.FileReader, error) {
) (objectio.Reader, error) {
time.Sleep(s.sleep)
s.openCnt.Inc()
return s.MemStorage.Open(ctx, filePath, o)

View File

@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
@ -73,8 +74,8 @@ type OneFileWriter struct {
rnd *rand.Rand
dataFile string
statFile string
dataWriter objstore.FileWriter
statWriter objstore.FileWriter
dataWriter objectio.Writer
statWriter objectio.Writer
onClose OnWriterCloseFunc
closed bool
@ -89,7 +90,7 @@ type OneFileWriter struct {
// below fields are only used when onDup is OnDuplicateKeyRecord.
recordedDupCnt int
dupFile string
dupWriter objstore.FileWriter
dupWriter objectio.Writer
dupKVStore *KeyValueStore
minKey []byte

View File

@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
@ -811,7 +812,7 @@ func (w *Writer) reCalculateKVSize() int64 {
func (w *Writer) createStorageWriter(ctx context.Context) (
dataFile, statFile string,
data, stats objstore.FileWriter,
data, stats objectio.Writer,
err error,
) {
dataPath := filepath.Join(w.getPartitionedPrefix(), strconv.Itoa(w.currentSeq))
@ -834,7 +835,7 @@ func (w *Writer) createStorageWriter(ctx context.Context) (
return dataPath, statPath, dataWriter, statsWriter, nil
}
func (w *Writer) createDupWriter(ctx context.Context) (string, objstore.FileWriter, error) {
func (w *Writer) createDupWriter(ctx context.Context) (string, objectio.Writer, error) {
path := filepath.Join(w.getPartitionedPrefix()+dupSuffix, strconv.Itoa(w.currentSeq))
writer, err := w.store.Create(ctx, path, &objstore.WriterOption{
Concurrency: 20,

View File

@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
@ -495,19 +496,19 @@ func (s *writerFirstCloseFailStorage) Create(
ctx context.Context,
path string,
option *objstore.WriterOption,
) (objstore.FileWriter, error) {
) (objectio.Writer, error) {
w, err := s.Storage.Create(ctx, path, option)
if err != nil {
return nil, err
}
if strings.Contains(path, statSuffix) {
return &firstCloseFailWriter{FileWriter: w, shouldFail: &s.shouldFail}, nil
return &firstCloseFailWriter{Writer: w, shouldFail: &s.shouldFail}, nil
}
return w, nil
}
type firstCloseFailWriter struct {
objstore.FileWriter
objectio.Writer
shouldFail *bool
}
@ -516,7 +517,7 @@ func (w *firstCloseFailWriter) Close(ctx context.Context) error {
*w.shouldFail = false
return fmt.Errorf("first close fail")
}
return w.FileWriter.Close(ctx)
return w.Writer.Close(ctx)
}
func TestFlushKVsRetry(t *testing.T) {

View File

@ -28,6 +28,8 @@ go_library(
"//pkg/lightning/metric",
"//pkg/lightning/worker",
"//pkg/objstore",
"//pkg/objstore/compressedio",
"//pkg/objstore/objectio",
"//pkg/parser",
"//pkg/parser/ast",
"//pkg/parser/charset",

View File

@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/util/logutil"
regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
@ -853,7 +854,7 @@ func (l *MDLoader) GetAllFiles() map[string]FileInfo {
func calculateFileBytes(ctx context.Context,
dataFile string,
compressType objstore.CompressType,
compressType compressedio.CompressType,
store objstore.Storage,
offset int64) (tot int, pos int64, err error) {
bytes := make([]byte, sampleCompressedFileSize)
@ -863,7 +864,7 @@ func calculateFileBytes(ctx context.Context,
}
defer reader.Close()
decompressConfig := objstore.DecompressConfig{ZStdDecodeConcurrency: 1}
decompressConfig := compressedio.DecompressConfig{ZStdDecodeConcurrency: 1}
compressReader, err := objstore.NewLimitedInterceptReader(reader, compressType, decompressConfig, offset)
if err != nil {
return 0, 0, errors.Trace(err)

View File

@ -23,6 +23,7 @@ import (
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/schema"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
)
// ParquetColumn defines the properties of a column in a Parquet file.
@ -38,7 +39,7 @@ type ParquetColumn struct {
}
type writeWrapper struct {
Writer objstore.FileWriter
Writer objectio.Writer
}
func (*writeWrapper) Seek(_ int64, _ int) (int64, error) {

View File

@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/lightning/worker"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
@ -676,7 +677,7 @@ func OpenReader(
ctx context.Context,
fileMeta *SourceFileMeta,
store objstore.Storage,
decompressCfg objstore.DecompressConfig,
decompressCfg compressedio.DecompressConfig,
) (reader objstore.ReadSeekCloser, err error) {
switch {
case fileMeta.Type == SourceTypeParquet:

View File

@ -25,6 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/lightning/worker"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/spkg/bom"
@ -91,7 +92,7 @@ func ExportStatement(ctx context.Context, store objstore.Storage,
if err != nil {
return nil, errors.Trace(err)
}
store = objstore.WithCompression(store, compressType, objstore.DecompressConfig{
store = objstore.WithCompression(store, compressType, compressedio.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
}

View File

@ -25,7 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/util/filter"
"go.uber.org/zap"
)
@ -88,18 +88,18 @@ const (
)
// ToStorageCompressType converts Compression to storage.CompressType.
func ToStorageCompressType(compression Compression) (objstore.CompressType, error) {
func ToStorageCompressType(compression Compression) (compressedio.CompressType, error) {
switch compression {
case CompressionGZ:
return objstore.Gzip, nil
return compressedio.Gzip, nil
case CompressionSnappy:
return objstore.Snappy, nil
return compressedio.Snappy, nil
case CompressionZStd:
return objstore.Zstd, nil
return compressedio.Zstd, nil
case CompressionNone:
return objstore.NoCompression, nil
return compressedio.NoCompression, nil
default:
return objstore.NoCompression,
return compressedio.NoCompression,
errors.Errorf("compression %d doesn't have related storage compressType", compression)
}
}

View File

@ -22,7 +22,6 @@ go_library(
"s3.go",
"s3_interface.go",
"storage.go",
"writer.go",
],
importpath = "github.com/pingcap/tidb/pkg/objstore",
visibility = ["//visibility:public"],
@ -32,6 +31,8 @@ go_library(
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//pkg/metrics",
"//pkg/objstore/compressedio",
"//pkg/objstore/objectio",
"//pkg/objstore/recording",
"//pkg/sessionctx/variable",
"//pkg/util",
@ -66,9 +67,6 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_go_resty_resty_v2//:resty",
"@com_github_google_uuid//:uuid",
"@com_github_klauspost_compress//gzip",
"@com_github_klauspost_compress//snappy",
"@com_github_klauspost_compress//zstd",
"@com_github_ks3sdklib_aws_sdk_go//aws",
"@com_github_ks3sdklib_aws_sdk_go//aws/awserr",
"@com_github_ks3sdklib_aws_sdk_go//aws/credentials",
@ -108,7 +106,6 @@ go_test(
"s3_flags_test.go",
"s3_test.go",
"storage_test.go",
"writer_test.go",
],
embed = [":objstore"],
flaky = True,
@ -116,6 +113,8 @@ go_test(
deps = [
"//br/pkg/mock",
"//dumpling/context",
"//pkg/objstore/compressedio",
"//pkg/objstore/objectio",
"//pkg/objstore/recording",
"//pkg/util/intest",
"@com_github_aws_aws_sdk_go_v2//aws",
@ -125,7 +124,6 @@ go_test(
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//bloberror",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",

View File

@ -43,6 +43,8 @@ import (
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/spf13/pflag"
"go.uber.org/zap"
)
@ -636,7 +638,7 @@ func (s *AzureBlobStorage) DeleteFiles(ctx context.Context, names []string) erro
}
// Open implements the StorageReader interface.
func (s *AzureBlobStorage) Open(ctx context.Context, name string, o *ReaderOption) (FileReader, error) {
func (s *AzureBlobStorage) Open(ctx context.Context, name string, o *ReaderOption) (objectio.Reader, error) {
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
resp, err := client.GetProperties(ctx, nil)
if err != nil {
@ -714,7 +716,7 @@ func (s *AzureBlobStorage) URI() string {
const azblobChunkSize = 64 * 1024 * 1024
// Create implements the StorageWriter interface.
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (FileWriter, error) {
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (objectio.Writer, error) {
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
uploader := &azblobUploader{
blobClient: client,
@ -727,7 +729,7 @@ func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOptio
cpkInfo: s.cpkInfo,
}
uploaderWriter := newBufferedWriter(uploader, azblobChunkSize, NoCompression, nil)
uploaderWriter := objectio.NewBufferedWriter(uploader, azblobChunkSize, compressedio.NoCompression, nil)
return uploaderWriter, nil
}

View File

@ -30,6 +30,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/stretchr/testify/require"
)
@ -464,7 +465,7 @@ func TestAzblobSeekToEndShouldNotError(t *testing.T) {
}
type wr struct {
w FileWriter
w objectio.Writer
ctx context.Context
}

View File

@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"go.uber.org/multierr"
)
@ -152,7 +153,7 @@ func (d *Batched) Rename(ctx context.Context, oldName, newName string) error {
}
// Create implements the Storage interface.
func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (FileWriter, error) {
func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (objectio.Writer, error) {
return nil, errors.Annotatef(berrors.ErrStorageUnknown, "ExternalStorage.Create isn't allowed in batch mode for now.")
}

View File

@ -21,18 +21,20 @@ import (
"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/objstore/recording"
)
type withCompression struct {
Storage
compressType CompressType
decompressCfg DecompressConfig
compressType compressedio.CompressType
decompressCfg compressedio.DecompressConfig
}
// WithCompression returns an Storage with compress option
func WithCompression(inner Storage, compressionType CompressType, cfg DecompressConfig) Storage {
if compressionType == NoCompression {
func WithCompression(inner Storage, compressionType compressedio.CompressType, cfg compressedio.DecompressConfig) Storage {
if compressionType == compressedio.NoCompression {
return inner
}
return &withCompression{
@ -42,21 +44,21 @@ func WithCompression(inner Storage, compressionType CompressType, cfg Decompress
}
}
func (w *withCompression) Create(ctx context.Context, name string, o *WriterOption) (FileWriter, error) {
func (w *withCompression) Create(ctx context.Context, name string, o *WriterOption) (objectio.Writer, error) {
writer, err := w.Storage.Create(ctx, name, o)
if err != nil {
return nil, errors.Trace(err)
}
// some implementation already wrap the writer, so we need to unwrap it
if bw, ok := writer.(*bufferedWriter); ok {
writer = bw.writer
if bw, ok := writer.(*objectio.BufferedWriter); ok {
writer = bw.GetWriter()
}
// the external storage will do access recording, so no need to pass it again.
compressedWriter := newBufferedWriter(writer, hardcodedS3ChunkSize, w.compressType, nil)
compressedWriter := objectio.NewBufferedWriter(writer, hardcodedS3ChunkSize, w.compressType, nil)
return compressedWriter, nil
}
func (w *withCompression) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) {
func (w *withCompression) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) {
fileReader, err := w.Storage.Open(ctx, path, o)
if err != nil {
return nil, errors.Trace(err)
@ -70,7 +72,7 @@ func (w *withCompression) Open(ctx context.Context, path string, o *ReaderOption
func (w *withCompression) WriteFile(ctx context.Context, name string, data []byte) error {
bf := bytes.NewBuffer(make([]byte, 0, len(data)))
compressBf := newCompressWriter(w.compressType, bf)
compressBf := compressedio.NewWriter(w.compressType, bf)
_, err := compressBf.Write(data)
if err != nil {
return errors.Trace(err)
@ -88,7 +90,7 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
return data, errors.Trace(err)
}
bf := bytes.NewBuffer(data)
compressBf, err := newCompressReader(w.compressType, w.decompressCfg, bf)
compressBf, err := compressedio.NewReader(w.compressType, w.decompressCfg, bf)
if err != nil {
return nil, err
}
@ -103,18 +105,18 @@ type compressReader struct {
}
// InterceptDecompressReader intercepts the reader and wraps it with a decompress
// reader on the given FileReader. Note that the returned
// FileReader does not have the property that Seek(0, io.SeekCurrent)
// reader on the given Reader. Note that the returned
// Reader does not have the property that Seek(0, io.SeekCurrent)
// equals total bytes Read() if the decompress reader is used.
func InterceptDecompressReader(
fileReader FileReader,
compressType CompressType,
cfg DecompressConfig,
) (FileReader, error) {
if compressType == NoCompression {
fileReader objectio.Reader,
compressType compressedio.CompressType,
cfg compressedio.DecompressConfig,
) (objectio.Reader, error) {
if compressType == compressedio.NoCompression {
return fileReader, nil
}
r, err := newCompressReader(compressType, cfg, fileReader)
r, err := compressedio.NewReader(compressType, cfg, fileReader)
if err != nil {
return nil, errors.Trace(err)
}
@ -127,11 +129,11 @@ func InterceptDecompressReader(
// NewLimitedInterceptReader creates a decompress reader with limit n.
func NewLimitedInterceptReader(
fileReader FileReader,
compressType CompressType,
cfg DecompressConfig,
fileReader objectio.Reader,
compressType compressedio.CompressType,
cfg compressedio.DecompressConfig,
n int64,
) (FileReader, error) {
) (objectio.Reader, error) {
newFileReader := fileReader
if n < 0 {
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support negative limit, n: %d", n)
@ -164,7 +166,7 @@ func (c *compressReader) GetFileSize() (int64, error) {
type flushStorageWriter struct {
writer io.Writer
flusher flusher
flusher compressedio.Flusher
closer io.Closer
accessRec *recording.AccessStats
}
@ -183,7 +185,7 @@ func (w *flushStorageWriter) Close(_ context.Context) error {
return w.closer.Close()
}
func newFlushStorageWriter(writer io.Writer, flusher2 flusher, closer io.Closer, accessRec *recording.AccessStats) *flushStorageWriter {
func newFlushStorageWriter(writer io.Writer, flusher2 compressedio.Flusher, closer io.Closer, accessRec *recording.AccessStats) *flushStorageWriter {
return &flushStorageWriter{
writer: writer,
flusher: flusher2,

View File

@ -22,6 +22,7 @@ import (
"strings"
"testing"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/stretchr/testify/require"
)
@ -32,7 +33,7 @@ func TestWithCompressReadWriteFile(t *testing.T) {
ctx := context.Background()
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
storage = WithCompression(storage, Gzip, DecompressConfig{})
storage = WithCompression(storage, compressedio.Gzip, compressedio.DecompressConfig{})
name := "with compress test"
content := "hello,world!"
fileName := strings.ReplaceAll(name, " ", "-") + ".txt.gz"
@ -42,7 +43,7 @@ func TestWithCompressReadWriteFile(t *testing.T) {
// make sure compressed file is written correctly
file, err := os.Open(filepath.Join(dir, fileName))
require.NoError(t, err)
uncompressedFile, err := newCompressReader(Gzip, DecompressConfig{}, file)
uncompressedFile, err := compressedio.NewReader(compressedio.Gzip, compressedio.DecompressConfig{}, file)
require.NoError(t, err)
newContent, err := io.ReadAll(uncompressedFile)
require.NoError(t, err)

View File

@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "compressedio",
srcs = [
"buffer.go",
"def.go",
"reader.go",
"writer.go",
],
importpath = "github.com/pingcap/tidb/pkg/objstore/compressedio",
visibility = ["//visibility:public"],
deps = [
"@com_github_klauspost_compress//gzip",
"@com_github_klauspost_compress//snappy",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)

View File

@ -0,0 +1,74 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compressedio
import (
"bytes"
"github.com/pingcap/errors"
)
// Buffer a compressed buffer.
type Buffer struct {
*bytes.Buffer
writer Writer
cap int
}
// Write implements objectio.interceptBuffer.
func (b *Buffer) Write(p []byte) (int, error) {
written, err := b.writer.Write(p)
return written, errors.Trace(err)
}
// Len implements objectio.interceptBuffer.
func (b *Buffer) Len() int {
return b.Buffer.Len()
}
// Cap implements objectio.interceptBuffer.
func (b *Buffer) Cap() int {
return b.cap
}
// Reset implements objectio.interceptBuffer.
func (b *Buffer) Reset() {
b.Buffer.Reset()
}
// Flush implements objectio.interceptBuffer.
func (b *Buffer) Flush() error {
return b.writer.Flush()
}
// Close implements objectio.interceptBuffer.
func (b *Buffer) Close() error {
return b.writer.Close()
}
// Compressed implements objectio.interceptBuffer.
func (*Buffer) Compressed() bool {
return true
}
// NewBuffer creates a new Buffer.
func NewBuffer(chunkSize int, compressType CompressType) *Buffer {
bf := bytes.NewBuffer(make([]byte, 0, chunkSize))
return &Buffer{
Buffer: bf,
cap: chunkSize,
writer: NewWriter(compressType, bf),
}
}

View File

@ -0,0 +1,75 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compressedio
import "github.com/pingcap/errors"
// CompressType represents the type of compression.
type CompressType uint8
const (
// NoCompression won't compress given bytes.
NoCompression CompressType = iota
// Gzip will compress given bytes in gzip format.
Gzip
// Snappy will compress given bytes in snappy format.
Snappy
// Zstd will compress given bytes in zstd format.
Zstd
)
// FileSuffix returns the file suffix for the compress type.
func (ct CompressType) FileSuffix() string {
switch ct {
case NoCompression:
return ""
case Gzip:
return ".gz"
case Snappy:
return ".snappy"
case Zstd:
return ".zst"
default:
return ""
}
}
// Flusher flush interface.
type Flusher interface {
Flush() error
}
// DecompressConfig is the config used for decompression.
type DecompressConfig struct {
// ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency.
// if not 1, ZStd will decode file asynchronously.
ZStdDecodeConcurrency int
}
// ParseCompressType parses compressType string to storage.CompressType
func ParseCompressType(compressType string) (CompressType, error) {
switch compressType {
case "", "no-compression":
return NoCompression, nil
case "gzip", "gz":
return Gzip, nil
case "snappy":
return Snappy, nil
case "zstd", "zst":
return Zstd, nil
default:
return NoCompression, errors.Errorf("unknown compress type %s", compressType)
}
}

View File

@ -0,0 +1,42 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compressedio
import (
"io"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
)
// NewReader read compressed data.
// only for test now.
func NewReader(compressType CompressType, cfg DecompressConfig, r io.Reader) (io.Reader, error) {
switch compressType {
case Gzip:
return gzip.NewReader(r)
case Snappy:
return snappy.NewReader(r), nil
case Zstd:
options := []zstd.DOption{}
if cfg.ZStdDecodeConcurrency > 0 {
options = append(options, zstd.WithDecoderConcurrency(cfg.ZStdDecodeConcurrency))
}
return zstd.NewReader(r, options...)
default:
return nil, nil
}
}

View File

@ -0,0 +1,49 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compressedio
import (
"io"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/log"
"go.uber.org/zap"
)
// Writer a compressed writer interface.
type Writer interface {
io.WriteCloser
Flusher
}
// NewWriter creates a compress writer
func NewWriter(compressType CompressType, w io.Writer) Writer {
switch compressType {
case Gzip:
return gzip.NewWriter(w)
case Snappy:
return snappy.NewBufferedWriter(w)
case Zstd:
newWriter, err := zstd.NewWriter(w)
if err != nil {
log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err))
}
return newWriter
default:
return nil
}
}

View File

@ -33,6 +33,8 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/objstore/recording"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
@ -267,7 +269,7 @@ func (s *GCSStorage) FileExists(ctx context.Context, name string) (bool, error)
}
// Open a Reader by file path.
func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) {
func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) {
object := s.objectName(path)
handle := s.GetBucketHandle().Object(object)
@ -360,7 +362,7 @@ func (s *GCSStorage) URI() string {
}
// Create implements Storage interface.
func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (FileWriter, error) {
func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (objectio.Writer, error) {
// NewGCSWriter requires real testing environment on Google Cloud.
mockGCS := intest.InTest && strings.Contains(s.gcs.GetEndpoint(), "127.0.0.1")
if wo == nil || wo.Concurrency <= 1 || mockGCS {
@ -368,7 +370,7 @@ func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption)
wc := s.GetBucketHandle().Object(object).NewWriter(ctx)
wc.StorageClass = s.gcs.StorageClass
wc.PredefinedACL = s.gcs.PredefinedAcl
return newFlushStorageWriter(wc, &emptyFlusher{}, wc, s.accessRec), nil
return newFlushStorageWriter(wc, &objectio.EmptyFlusher{}, wc, s.accessRec), nil
}
uri := s.objectName(name)
// 5MB is the minimum part size for GCS.
@ -377,9 +379,9 @@ func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption)
if err != nil {
return nil, errors.Trace(err)
}
fw := newFlushStorageWriter(w, &emptyFlusher{}, w, s.accessRec)
fw := newFlushStorageWriter(w, &objectio.EmptyFlusher{}, w, s.accessRec)
// we already pass the accessRec to flushStorageWriter.
bw := newBufferedWriter(fw, int(partSize), NoCompression, nil)
bw := objectio.NewBufferedWriter(fw, int(partSize), compressedio.NoCompression, nil)
return bw, nil
}

View File

@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/objstore/objectio"
)
// HDFSStorage represents HDFS storage.
@ -120,7 +121,7 @@ func (*HDFSStorage) DeleteFiles(_ context.Context, _ []string) error {
}
// Open a Reader by file path. path is relative path to storage base path
func (*HDFSStorage) Open(_ context.Context, _ string, _ *ReaderOption) (FileReader, error) {
func (*HDFSStorage) Open(_ context.Context, _ string, _ *ReaderOption) (objectio.Reader, error) {
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
}
@ -140,7 +141,7 @@ func (s *HDFSStorage) URI() string {
}
// Create opens a file writer by path. path is relative path to storage base path
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (FileWriter, error) {
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (objectio.Writer, error) {
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
}

View File

@ -35,6 +35,8 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/objstore/recording"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/prefetch"
@ -445,7 +447,7 @@ func (rs *KS3Storage) URI() string {
}
// Open a Reader by file path.
func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) {
func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) {
start := int64(0)
end := int64(0)
prefetchSize := 0
@ -678,7 +680,7 @@ func (r *ks3ObjectReader) GetFileSize() (int64, error) {
}
// createUploader create multi upload request.
func (rs *KS3Storage) createUploader(ctx context.Context, name string) (FileWriter, error) {
func (rs *KS3Storage) createUploader(ctx context.Context, name string) (objectio.Writer, error) {
input := &s3.CreateMultipartUploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
@ -708,8 +710,8 @@ func (rs *KS3Storage) createUploader(ctx context.Context, name string) (FileWrit
}
// Create creates multi upload request.
func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOption) (FileWriter, error) {
var uploader FileWriter
func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOption) (objectio.Writer, error) {
var uploader objectio.Writer
var err error
if option == nil || option.Concurrency <= 1 {
uploader, err = rs.createUploader(ctx, name)
@ -747,7 +749,7 @@ func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOpt
if option != nil && option.PartSize > 0 {
bufSize = int(option.PartSize)
}
uploaderWriter := newBufferedWriter(uploader, bufSize, NoCompression, rs.accessRec)
uploaderWriter := objectio.NewBufferedWriter(uploader, bufSize, compressedio.NoCompression, rs.accessRec)
return uploaderWriter, nil
}

View File

@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"go.uber.org/zap"
)
@ -209,7 +210,7 @@ func (l *LocalStorage) URI() string {
}
// Open a Reader by file path, path is a relative path to base path.
func (l *LocalStorage) Open(_ context.Context, path string, o *ReaderOption) (FileReader, error) {
func (l *LocalStorage) Open(_ context.Context, path string, o *ReaderOption) (objectio.Reader, error) {
//nolint: gosec
f, err := os.Open(filepath.Join(l.base, path))
if err != nil {
@ -273,7 +274,7 @@ func (f *localFile) GetFileSize() (int64, error) {
}
// Create implements Storage interface.
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (FileWriter, error) {
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (objectio.Writer, error) {
filename := filepath.Join(l.base, name)
dir := filepath.Dir(filename)
err := os.MkdirAll(dir, 0750)

View File

@ -24,6 +24,7 @@ import (
"testing"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/stretchr/testify/require"
)
@ -199,7 +200,7 @@ func TestLocalFileReadRange(t *testing.T) {
require.NoError(t, err)
require.NoError(t, w.Close(ctx))
checkContent := func(r FileReader, expected string) {
checkContent := func(r objectio.Reader, expected string) {
buf := make([]byte, 10)
n, _ := r.Read(buf)
require.Equal(t, expected, string(buf[:n]))

View File

@ -25,6 +25,7 @@ import (
"sync"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"go.uber.org/atomic"
)
@ -146,7 +147,7 @@ func (s *MemStorage) FileExists(ctx context.Context, name string) (bool, error)
// Open opens a Reader by file path.
// It implements the `Storage` interface
func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption) (FileReader, error) {
func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption) (objectio.Reader, error) {
if err := ctx.Err(); err != nil {
return nil, errors.Trace(err)
}
@ -227,7 +228,7 @@ func (*MemStorage) URI() string {
// Create creates a file and returning a writer to write data into.
// When the writer is closed, the data is stored in the file.
// It implements the `Storage` interface
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (FileWriter, error) {
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (objectio.Writer, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
@ -312,7 +313,7 @@ type memFileWriter struct {
}
// Write writes the data into the mem storage file buffer.
// It implements the `FileWriter` interface
// It implements the `Writer` interface
func (w *memFileWriter) Write(ctx context.Context, p []byte) (int, error) {
select {
case <-ctx.Done():

View File

@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/objstore",
"//pkg/objstore/objectio",
"@org_uber_go_mock//gomock",
],
)

View File

@ -14,6 +14,7 @@ import (
reflect "reflect"
objstore "github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/objectio"
gomock "go.uber.org/mock/gomock"
)
@ -58,10 +59,10 @@ func (mr *MockStorageMockRecorder) Close() *gomock.Call {
}
// Create mocks base method.
func (m *MockStorage) Create(arg0 context.Context, arg1 string, arg2 *objstore.WriterOption) (objstore.FileWriter, error) {
func (m *MockStorage) Create(arg0 context.Context, arg1 string, arg2 *objstore.WriterOption) (objectio.Writer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Create", arg0, arg1, arg2)
ret0, _ := ret[0].(objstore.FileWriter)
ret0, _ := ret[0].(objectio.Writer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
@ -116,10 +117,10 @@ func (mr *MockStorageMockRecorder) FileExists(arg0, arg1 any) *gomock.Call {
}
// Open mocks base method.
func (m *MockStorage) Open(arg0 context.Context, arg1 string, arg2 *objstore.ReaderOption) (objstore.FileReader, error) {
func (m *MockStorage) Open(arg0 context.Context, arg1 string, arg2 *objstore.ReaderOption) (objectio.Reader, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Open", arg0, arg1, arg2)
ret0, _ := ret[0].(objstore.FileReader)
ret0, _ := ret[0].(objectio.Reader)
ret1, _ := ret[1].(error)
return ret0, ret1
}

View File

@ -16,6 +16,8 @@ package objstore
import (
"context"
"github.com/pingcap/tidb/pkg/objstore/objectio"
)
type noopStorage struct{}
@ -46,7 +48,7 @@ func (*noopStorage) FileExists(_ context.Context, _ string) (bool, error) {
}
// Open a Reader by file path.
func (*noopStorage) Open(_ context.Context, _ string, _ *ReaderOption) (FileReader, error) {
func (*noopStorage) Open(_ context.Context, _ string, _ *ReaderOption) (objectio.Reader, error) {
return noopReader{}, nil
}
@ -60,7 +62,7 @@ func (*noopStorage) URI() string {
}
// Create implements Storage interface.
func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (FileWriter, error) {
func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (objectio.Writer, error) {
return &NoopWriter{}, nil
}
@ -97,12 +99,12 @@ func (noopReader) GetFileSize() (int64, error) {
// NoopWriter is a writer that does nothing.
type NoopWriter struct{}
// Write implements FileWriter interface.
// Write implements objectio.Writer interface.
func (NoopWriter) Write(_ context.Context, p []byte) (int, error) {
return len(p), nil
}
// Close implements FileWriter interface.
// Close implements objectio.Writer interface.
func (NoopWriter) Close(_ context.Context) error {
return nil
}

View File

@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "objectio",
srcs = [
"interface.go",
"writer.go",
],
importpath = "github.com/pingcap/tidb/pkg/objstore/objectio",
visibility = ["//visibility:public"],
deps = [
"//pkg/objstore/compressedio",
"//pkg/objstore/recording",
"@com_github_pingcap_errors//:errors",
],
)
go_test(
name = "objectio_test",
timeout = "short",
srcs = ["writer_test.go"],
flaky = True,
shard_count = 3,
deps = [
"//pkg/objstore",
"//pkg/objstore/compressedio",
"@com_github_klauspost_compress//zstd",
"@com_github_stretchr_testify//require",
],
)

View File

@ -0,0 +1,35 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package objectio
import (
"context"
"io"
)
// Reader represents the streaming external file reader.
type Reader interface {
io.ReadSeekCloser
// GetFileSize returns the file size.
GetFileSize() (int64, error)
}
// Writer represents the streaming external file writer.
type Writer interface {
// Write writes to buffer and if chunk is filled will upload it
Write(ctx context.Context, p []byte) (int, error)
// Close writes final chunk and completes the upload
Close(ctx context.Context) error
}

View File

@ -0,0 +1,155 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package objectio
import (
"bytes"
"context"
"io"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/recording"
)
// EmptyFlusher empty Flusher.
type EmptyFlusher struct{}
// Flush do flush.
func (*EmptyFlusher) Flush() error {
return nil
}
type interceptBuffer interface {
io.WriteCloser
compressedio.Flusher
Len() int
Cap() int
Bytes() []byte
Reset()
Compressed() bool
}
func newInterceptBuffer(chunkSize int, compressType compressedio.CompressType) interceptBuffer {
if compressType == compressedio.NoCompression {
return newPlainBuffer(chunkSize)
}
return compressedio.NewBuffer(chunkSize, compressType)
}
type plainBuffer struct {
*bytes.Buffer
}
func (*plainBuffer) Flush() error {
return nil
}
func (*plainBuffer) Close() error {
return nil
}
func (*plainBuffer) Compressed() bool {
return false
}
func newPlainBuffer(chunkSize int) *plainBuffer {
return &plainBuffer{bytes.NewBuffer(make([]byte, 0, chunkSize))}
}
// BufferedWriter is a buffered writer
type BufferedWriter struct {
buf interceptBuffer
writer Writer
accessRec *recording.AccessStats
}
// Write implements Writer.
func (u *BufferedWriter) Write(ctx context.Context, p []byte) (int, error) {
n, err := u.write0(ctx, p)
u.accessRec.RecWrite(n)
return n, errors.Trace(err)
}
func (u *BufferedWriter) write0(ctx context.Context, p []byte) (int, error) {
bytesWritten := 0
for u.buf.Len()+len(p) > u.buf.Cap() {
// We won't fit p in this chunk
// Is this chunk full?
chunkToFill := u.buf.Cap() - u.buf.Len()
if chunkToFill > 0 {
// It's not full so we write enough of p to fill it
prewrite := p[0:chunkToFill]
w, err := u.buf.Write(prewrite)
bytesWritten += w
if err != nil {
return bytesWritten, errors.Trace(err)
}
p = p[w:]
// continue buf because compressed data size may be less than Cap - Len
if u.buf.Compressed() {
continue
}
}
_ = u.buf.Flush()
err := u.uploadChunk(ctx)
if err != nil {
return 0, errors.Trace(err)
}
}
w, err := u.buf.Write(p)
bytesWritten += w
return bytesWritten, errors.Trace(err)
}
func (u *BufferedWriter) uploadChunk(ctx context.Context) error {
if u.buf.Len() == 0 {
return nil
}
b := u.buf.Bytes()
u.buf.Reset()
_, err := u.writer.Write(ctx, b)
return errors.Trace(err)
}
// Close implements Writer.
func (u *BufferedWriter) Close(ctx context.Context) error {
u.buf.Close()
err := u.uploadChunk(ctx)
if err != nil {
return errors.Trace(err)
}
return u.writer.Close(ctx)
}
// GetWriter get the underlying writer.
func (u *BufferedWriter) GetWriter() Writer {
return u.writer
}
// NewUploaderWriter wraps the Writer interface over an uploader.
func NewUploaderWriter(writer Writer, chunkSize int, compressType compressedio.CompressType) Writer {
return NewBufferedWriter(writer, chunkSize, compressType, nil)
}
// NewBufferedWriter is used to build a buffered writer.
func NewBufferedWriter(writer Writer, chunkSize int, compressType compressedio.CompressType, accessRec *recording.AccessStats) *BufferedWriter {
return &BufferedWriter{
writer: writer,
buf: newInterceptBuffer(chunkSize, compressType),
accessRec: accessRec,
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package objstore
package objectio_test
import (
"bytes"
@ -25,9 +25,36 @@ import (
"testing"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/stretchr/testify/require"
)
func getStore(t *testing.T, uri string, changeStoreFn func(s objstore.Storage) objstore.Storage) objstore.Storage {
t.Helper()
backend, err := objstore.ParseBackend(uri, nil)
require.NoError(t, err)
ctx := context.Background()
storage, err := objstore.Create(ctx, backend, true)
require.NoError(t, err)
return changeStoreFn(storage)
}
func writeFile(t *testing.T, storage objstore.Storage, fileName string, lines []string) {
t.Helper()
ctx := context.Background()
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range lines {
p := []byte(str)
written, err2 := writer.Write(ctx, p)
require.Nil(t, err2)
require.Len(t, p, written)
}
err = writer.Close(ctx)
require.NoError(t, err)
}
func TestExternalFileWriter(t *testing.T) {
dir := t.TempDir()
@ -37,22 +64,11 @@ func TestExternalFileWriter(t *testing.T) {
}
testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil)
require.NoError(t, err)
ctx := context.Background()
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
storage := getStore(t, "local://"+filepath.ToSlash(dir), func(s objstore.Storage) objstore.Storage {
return s
})
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt"
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
written, err2 := writer.Write(ctx, p)
require.Nil(t, err2)
require.Len(t, p, written)
}
err = writer.Close(ctx)
require.NoError(t, err)
writeFile(t, storage, fileName, test.content)
content, err := os.ReadFile(filepath.Join(dir, fileName))
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), string(content))
@ -107,33 +123,21 @@ func TestCompressReaderWriter(t *testing.T) {
type testcase struct {
name string
content []string
compressType CompressType
compressType compressedio.CompressType
}
testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil)
require.NoError(t, err)
ctx := context.Background()
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
storage = WithCompression(storage, test.compressType, DecompressConfig{})
suffix := createSuffixString(test.compressType)
suffix := test.compressType.FileSuffix()
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
written, err2 := writer.Write(ctx, p)
require.Nil(t, err2)
require.Len(t, p, written)
}
err = writer.Close(ctx)
require.NoError(t, err)
storage := getStore(t, "local://"+filepath.ToSlash(dir), func(s objstore.Storage) objstore.Storage {
return objstore.WithCompression(s, test.compressType, compressedio.DecompressConfig{})
})
writeFile(t, storage, fileName, test.content)
// make sure compressed file is written correctly
file, err := os.Open(filepath.Join(dir, fileName))
require.NoError(t, err)
r, err := newCompressReader(test.compressType, DecompressConfig{}, file)
r, err := compressedio.NewReader(test.compressType, compressedio.DecompressConfig{}, file)
require.NoError(t, err)
var bf bytes.Buffer
_, err = bf.ReadFrom(r)
@ -141,6 +145,7 @@ func TestCompressReaderWriter(t *testing.T) {
require.Equal(t, strings.Join(test.content, ""), bf.String())
// test withCompression Open
ctx := context.Background()
r, err = storage.Open(ctx, fileName, nil)
require.NoError(t, err)
content, err := io.ReadAll(r)
@ -149,7 +154,7 @@ func TestCompressReaderWriter(t *testing.T) {
require.Nil(t, file.Close())
}
compressTypeArr := []CompressType{Gzip, Snappy, Zstd}
compressTypeArr := []compressedio.CompressType{compressedio.Gzip, compressedio.Snappy, compressedio.Zstd}
tests := []testcase{
{
@ -196,7 +201,7 @@ func TestNewCompressReader(t *testing.T) {
// default cfg(decode asynchronously)
prevRoutineCnt := runtime.NumGoroutine()
r, err := newCompressReader(Zstd, DecompressConfig{}, bytes.NewReader(compressedData))
r, err := compressedio.NewReader(compressedio.Zstd, compressedio.DecompressConfig{}, bytes.NewReader(compressedData))
currRoutineCnt := runtime.NumGoroutine()
require.NoError(t, err)
require.Greater(t, currRoutineCnt, prevRoutineCnt)
@ -206,8 +211,8 @@ func TestNewCompressReader(t *testing.T) {
// sync decode
prevRoutineCnt = runtime.NumGoroutine()
config := DecompressConfig{ZStdDecodeConcurrency: 1}
r, err = newCompressReader(Zstd, config, bytes.NewReader(compressedData))
config := compressedio.DecompressConfig{ZStdDecodeConcurrency: 1}
r, err = compressedio.NewReader(compressedio.Zstd, config, bytes.NewReader(compressedData))
require.NoError(t, err)
currRoutineCnt = runtime.NumGoroutine()
require.Equal(t, prevRoutineCnt, currRoutineCnt)

View File

@ -52,6 +52,8 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/objstore/recording"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/prefetch"
@ -1024,7 +1026,7 @@ func (rs *S3Storage) URI() string {
}
// Open a Reader by file path.
func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (FileReader, error) {
func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (objectio.Reader, error) {
start := int64(0)
end := int64(0)
prefetchSize := 0
@ -1318,7 +1320,7 @@ func (r *s3ObjectReader) GetFileSize() (int64, error) {
}
// createUploader create multi upload request.
func (rs *S3Storage) createUploader(ctx context.Context, name string) (FileWriter, error) {
func (rs *S3Storage) createUploader(ctx context.Context, name string) (objectio.Writer, error) {
input := &s3.CreateMultipartUploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
@ -1369,8 +1371,8 @@ func (s *s3ObjectWriter) Close(_ context.Context) error {
}
// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (FileWriter, error) {
var uploader FileWriter
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (objectio.Writer, error) {
var uploader objectio.Writer
var err error
if option == nil || option.Concurrency <= 1 {
uploader, err = rs.createUploader(ctx, name)
@ -1407,7 +1409,7 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti
if option != nil && option.PartSize > 0 {
bufSize = int(option.PartSize)
}
uploaderWriter := newBufferedWriter(uploader, bufSize, NoCompression, rs.accessRec)
uploaderWriter := objectio.NewBufferedWriter(uploader, bufSize, compressedio.NoCompression, rs.accessRec)
return uploaderWriter, nil
}

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/objstore/objectio"
"github.com/pingcap/tidb/pkg/objstore/recording"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
@ -106,14 +107,6 @@ type Uploader interface {
CompleteUpload(ctx context.Context) error
}
// Writer is like io.Writer but with Context, create a new writer on top of Uploader with NewUploaderWriter.
type Writer interface {
// Write writes to buffer and if chunk is filled will upload it
Write(ctx context.Context, p []byte) (int, error)
// Close writes final chunk and completes the upload
Close(ctx context.Context) error
}
// WriterOption writer option.
type WriterOption struct {
Concurrency int
@ -154,7 +147,7 @@ type Storage interface {
DeleteFile(ctx context.Context, name string) error
// Open a Reader by file path. path is relative path to storage base path.
// Some implementation will use the given ctx as the inner context of the reader.
Open(ctx context.Context, path string, option *ReaderOption) (FileReader, error)
Open(ctx context.Context, path string, option *ReaderOption) (objectio.Reader, error)
// DeleteFiles delete the files in storage
DeleteFiles(ctx context.Context, names []string) error
// WalkDir traverse all the files in a dir.
@ -171,28 +164,13 @@ type Storage interface {
// Create opens a file writer by path. path is relative path to storage base
// path. The old file under same path will be overwritten. Currently only s3
// implemented WriterOption.
Create(ctx context.Context, path string, option *WriterOption) (FileWriter, error)
Create(ctx context.Context, path string, option *WriterOption) (objectio.Writer, error)
// Rename file name from oldFileName to newFileName
Rename(ctx context.Context, oldFileName, newFileName string) error
// Close release the resources of the storage.
Close()
}
// FileReader represents the streaming external file reader.
type FileReader interface {
io.ReadSeekCloser
// GetFileSize returns the file size.
GetFileSize() (int64, error)
}
// FileWriter represents the streaming external file writer.
type FileWriter interface {
// Write writes to buffer and if chunk is filled will upload it
Write(ctx context.Context, p []byte) (int, error)
// Close writes final chunk and completes the upload
Close(ctx context.Context) error
}
// Options are backend-independent options provided to New.
type Options struct {
// SendCredentials marks whether to send credentials downstream.

View File

@ -1,308 +0,0 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package objstore
import (
"bytes"
"context"
"io"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/objstore/recording"
"go.uber.org/zap"
)
// CompressType represents the type of compression.
type CompressType uint8
const (
// NoCompression won't compress given bytes.
NoCompression CompressType = iota
// Gzip will compress given bytes in gzip format.
Gzip
// Snappy will compress given bytes in snappy format.
Snappy
// Zstd will compress given bytes in zstd format.
Zstd
)
// DecompressConfig is the config used for decompression.
type DecompressConfig struct {
// ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency.
// if not 1, ZStd will decode file asynchronously.
ZStdDecodeConcurrency int
}
type flusher interface {
Flush() error
}
type emptyFlusher struct{}
func (*emptyFlusher) Flush() error {
return nil
}
type interceptBuffer interface {
io.WriteCloser
flusher
Len() int
Cap() int
Bytes() []byte
Reset()
Compressed() bool
}
func createSuffixString(compressType CompressType) string {
txtSuffix := ".txt"
switch compressType {
case Gzip:
txtSuffix += ".gz"
case Snappy:
txtSuffix += ".snappy"
case Zstd:
txtSuffix += ".zst"
default:
return ""
}
return txtSuffix
}
func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {
if compressType == NoCompression {
return newNoCompressionBuffer(chunkSize)
}
return newSimpleCompressBuffer(chunkSize, compressType)
}
func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWriter {
switch compressType {
case Gzip:
return gzip.NewWriter(w)
case Snappy:
return snappy.NewBufferedWriter(w)
case Zstd:
newWriter, err := zstd.NewWriter(w)
if err != nil {
log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err))
}
return newWriter
default:
return nil
}
}
func newCompressReader(compressType CompressType, cfg DecompressConfig, r io.Reader) (io.Reader, error) {
switch compressType {
case Gzip:
return gzip.NewReader(r)
case Snappy:
return snappy.NewReader(r), nil
case Zstd:
options := []zstd.DOption{}
if cfg.ZStdDecodeConcurrency > 0 {
options = append(options, zstd.WithDecoderConcurrency(cfg.ZStdDecodeConcurrency))
}
return zstd.NewReader(r, options...)
default:
return nil, nil
}
}
type noCompressionBuffer struct {
*bytes.Buffer
}
func (*noCompressionBuffer) Flush() error {
return nil
}
func (*noCompressionBuffer) Close() error {
return nil
}
func (*noCompressionBuffer) Compressed() bool {
return false
}
func newNoCompressionBuffer(chunkSize int) *noCompressionBuffer {
return &noCompressionBuffer{bytes.NewBuffer(make([]byte, 0, chunkSize))}
}
type simpleCompressWriter interface {
io.WriteCloser
flusher
}
type simpleCompressBuffer struct {
*bytes.Buffer
compressWriter simpleCompressWriter
cap int
}
func (b *simpleCompressBuffer) Write(p []byte) (int, error) {
written, err := b.compressWriter.Write(p)
return written, errors.Trace(err)
}
func (b *simpleCompressBuffer) Len() int {
return b.Buffer.Len()
}
func (b *simpleCompressBuffer) Cap() int {
return b.cap
}
func (b *simpleCompressBuffer) Reset() {
b.Buffer.Reset()
}
func (b *simpleCompressBuffer) Flush() error {
return b.compressWriter.Flush()
}
func (b *simpleCompressBuffer) Close() error {
return b.compressWriter.Close()
}
func (*simpleCompressBuffer) Compressed() bool {
return true
}
func newSimpleCompressBuffer(chunkSize int, compressType CompressType) *simpleCompressBuffer {
bf := bytes.NewBuffer(make([]byte, 0, chunkSize))
return &simpleCompressBuffer{
Buffer: bf,
cap: chunkSize,
compressWriter: newCompressWriter(compressType, bf),
}
}
type bufferedWriter struct {
buf interceptBuffer
writer FileWriter
accessRec *recording.AccessStats
}
func (u *bufferedWriter) Write(ctx context.Context, p []byte) (int, error) {
n, err := u.write0(ctx, p)
u.accessRec.RecWrite(n)
return n, errors.Trace(err)
}
func (u *bufferedWriter) write0(ctx context.Context, p []byte) (int, error) {
bytesWritten := 0
for u.buf.Len()+len(p) > u.buf.Cap() {
// We won't fit p in this chunk
// Is this chunk full?
chunkToFill := u.buf.Cap() - u.buf.Len()
if chunkToFill > 0 {
// It's not full so we write enough of p to fill it
prewrite := p[0:chunkToFill]
w, err := u.buf.Write(prewrite)
bytesWritten += w
if err != nil {
return bytesWritten, errors.Trace(err)
}
p = p[w:]
// continue buf because compressed data size may be less than Cap - Len
if u.buf.Compressed() {
continue
}
}
_ = u.buf.Flush()
err := u.uploadChunk(ctx)
if err != nil {
return 0, errors.Trace(err)
}
}
w, err := u.buf.Write(p)
bytesWritten += w
return bytesWritten, errors.Trace(err)
}
func (u *bufferedWriter) uploadChunk(ctx context.Context) error {
if u.buf.Len() == 0 {
return nil
}
b := u.buf.Bytes()
u.buf.Reset()
_, err := u.writer.Write(ctx, b)
return errors.Trace(err)
}
func (u *bufferedWriter) Close(ctx context.Context) error {
u.buf.Close()
err := u.uploadChunk(ctx)
if err != nil {
return errors.Trace(err)
}
return u.writer.Close(ctx)
}
// NewUploaderWriter wraps the Writer interface over an uploader.
func NewUploaderWriter(writer FileWriter, chunkSize int, compressType CompressType) FileWriter {
return newBufferedWriter(writer, chunkSize, compressType, nil)
}
// newBufferedWriter is used to build a buffered writer.
func newBufferedWriter(writer FileWriter, chunkSize int, compressType CompressType, accessRec *recording.AccessStats) *bufferedWriter {
return &bufferedWriter{
writer: writer,
buf: newInterceptBuffer(chunkSize, compressType),
accessRec: accessRec,
}
}
// BytesWriter is a Writer implementation on top of bytes.Buffer that is useful for testing.
type BytesWriter struct {
buf *bytes.Buffer
}
// Write delegates to bytes.Buffer.
func (u *BytesWriter) Write(_ context.Context, p []byte) (int, error) {
return u.buf.Write(p)
}
// Close delegates to bytes.Buffer.
func (*BytesWriter) Close(_ context.Context) error {
// noop
return nil
}
// Bytes delegates to bytes.Buffer.
func (u *BytesWriter) Bytes() []byte {
return u.buf.Bytes()
}
// String delegates to bytes.Buffer.
func (u *BytesWriter) String() string {
return u.buf.String()
}
// Reset delegates to bytes.Buffer.
func (u *BytesWriter) Reset() {
u.buf.Reset()
}
// NewBufferWriter creates a Writer that simply writes to a buffer (useful for testing).
func NewBufferWriter() *BytesWriter {
return &BytesWriter{buf: &bytes.Buffer{}}
}