Files
tidb/pkg/objstore/objectio/writer_test.go

224 lines
5.9 KiB
Go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package objectio_test
import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/stretchr/testify/require"
)
func getStore(t *testing.T, uri string, changeStoreFn func(s storeapi.Storage) storeapi.Storage) storeapi.Storage {
t.Helper()
backend, err := objstore.ParseBackend(uri, nil)
require.NoError(t, err)
ctx := context.Background()
storage, err := objstore.Create(ctx, backend, true)
require.NoError(t, err)
return changeStoreFn(storage)
}
func writeFile(t *testing.T, storage storeapi.Storage, fileName string, lines []string) {
t.Helper()
ctx := context.Background()
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range lines {
p := []byte(str)
written, err2 := writer.Write(ctx, p)
require.Nil(t, err2)
require.Len(t, p, written)
}
err = writer.Close(ctx)
require.NoError(t, err)
}
func TestExternalFileWriter(t *testing.T) {
dir := t.TempDir()
type testcase struct {
name string
content []string
}
testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
storage := getStore(t, "local://"+filepath.ToSlash(dir), func(s storeapi.Storage) storeapi.Storage {
return s
})
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt"
writeFile(t, storage, fileName, test.content)
content, err := os.ReadFile(filepath.Join(dir, fileName))
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), string(content))
}
tests := []testcase{
{
name: "short and sweet",
content: []string{"hi"},
},
{
name: "long text small chunks",
content: []string{
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
},
},
{
name: "long text medium chunks",
content: []string{
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
},
},
{
name: "long text large chunks",
content: []string{
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
},
},
}
for i := range tests {
testFn(&tests[i], t)
}
}
func TestCompressReaderWriter(t *testing.T) {
dir := t.TempDir()
type testcase struct {
name string
content []string
compressType compressedio.CompressType
}
testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
suffix := test.compressType.FileSuffix()
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
storage := getStore(t, "local://"+filepath.ToSlash(dir), func(s storeapi.Storage) storeapi.Storage {
return objstore.WithCompression(s, test.compressType, compressedio.DecompressConfig{})
})
writeFile(t, storage, fileName, test.content)
// make sure compressed file is written correctly
file, err := os.Open(filepath.Join(dir, fileName))
require.NoError(t, err)
r, err := compressedio.NewReader(test.compressType, compressedio.DecompressConfig{}, file)
require.NoError(t, err)
var bf bytes.Buffer
_, err = bf.ReadFrom(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), bf.String())
// test withCompression Open
ctx := context.Background()
r, err = storage.Open(ctx, fileName, nil)
require.NoError(t, err)
content, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), string(content))
require.Nil(t, file.Close())
}
compressTypeArr := []compressedio.CompressType{compressedio.Gzip, compressedio.Snappy, compressedio.Zstd}
tests := []testcase{
{
name: "long text medium chunks",
content: []string{
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
},
},
{
name: "long text large chunks",
content: []string{
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
},
},
}
for i := range tests {
for _, compressType := range compressTypeArr {
tests[i].compressType = compressType
testFn(&tests[i], t)
}
}
}
func TestNewCompressReader(t *testing.T) {
var buf bytes.Buffer
var w io.WriteCloser
var err error
w, err = zstd.NewWriter(&buf)
require.NoError(t, err)
_, err = w.Write([]byte("data"))
require.NoError(t, err)
require.NoError(t, w.Close())
compressedData := buf.Bytes()
// default cfg(decode asynchronously)
prevRoutineCnt := runtime.NumGoroutine()
r, err := compressedio.NewReader(compressedio.Zstd, compressedio.DecompressConfig{}, bytes.NewReader(compressedData))
currRoutineCnt := runtime.NumGoroutine()
require.NoError(t, err)
require.Greater(t, currRoutineCnt, prevRoutineCnt)
allData, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, "data", string(allData))
// sync decode
prevRoutineCnt = runtime.NumGoroutine()
config := compressedio.DecompressConfig{ZStdDecodeConcurrency: 1}
r, err = compressedio.NewReader(compressedio.Zstd, config, bytes.NewReader(compressedData))
require.NoError(t, err)
currRoutineCnt = runtime.NumGoroutine()
require.Equal(t, prevRoutineCnt, currRoutineCnt)
allData, err = io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, "data", string(allData))
}