Files
tidb/pkg/objstore/memstore_test.go

270 lines
8.4 KiB
Go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package objstore
import (
"bytes"
"context"
"io"
"sync"
"testing"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/stretchr/testify/require"
)
func TestMemStoreBasic(t *testing.T) {
store := NewMemStorage()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
// write and then read
require.Nil(t, store.WriteFile(ctx, "/hello.txt", []byte("hello world")))
fileContent, err := store.ReadFile(ctx, "/hello.txt")
require.Nil(t, err)
require.True(t, bytes.Equal([]byte("hello world"), fileContent))
// write the same file and then read
require.Nil(t, store.WriteFile(ctx, "/hello.txt", []byte("hello world 2")))
fileContent, err = store.ReadFile(ctx, "/hello.txt")
require.Nil(t, err)
require.True(t, bytes.Equal([]byte("hello world 2"), fileContent))
// delete the file and then delete & read
require.Nil(t, store.DeleteFile(ctx, "/hello.txt"))
require.NotNil(t, store.DeleteFile(ctx, "/hello.txt"))
_, err = store.ReadFile(ctx, "/hello.txt")
require.NotNil(t, err)
// create a writer to write
w, err := store.Create(ctx, "/hello.txt", nil)
require.Nil(t, err)
_, err = w.Write(ctx, []byte("hello world 3"))
require.Nil(t, err)
// read the file content before writer closing
fileContent, err = store.ReadFile(ctx, "/hello.txt")
require.Nil(t, err)
require.True(t, bytes.Equal([]byte(""), fileContent))
require.Nil(t, w.Close(ctx))
// read the file content after writer closing
fileContent, err = store.ReadFile(ctx, "/hello.txt")
require.Nil(t, err)
require.True(t, bytes.Equal([]byte("hello world 3"), fileContent))
// simultaneously create two readers on the same file
r, err := store.Open(ctx, "/hello.txt", nil)
require.Nil(t, err)
r2, err := store.Open(ctx, "/hello.txt", nil)
require.Nil(t, err)
fileContent, err = io.ReadAll(r)
require.Nil(t, err)
require.True(t, bytes.Equal([]byte("hello world 3"), fileContent))
require.Nil(t, r.Close())
// operations after close
_, err = r.Read(make([]byte, 3))
require.NotNil(t, err)
// delete the file, but there is an existing reader on this file
require.Nil(t, store.DeleteFile(ctx, "/hello.txt"))
_, err = r2.Seek(5, io.SeekStart)
require.Nil(t, err)
fileContent, err = io.ReadAll(r2)
require.Nil(t, err)
require.True(t, bytes.Equal([]byte(" world 3"), fileContent))
// rename the file
var exists bool
require.Nil(t, store.WriteFile(ctx, "/hello.txt", []byte("hello world 3")))
require.Nil(t, store.WriteFile(ctx, "/hello2.txt", []byte("hello world 2")))
exists, err = store.FileExists(ctx, "/hello.txt")
require.Nil(t, err)
require.True(t, exists)
exists, err = store.FileExists(ctx, "/hello2.txt")
require.Nil(t, err)
require.True(t, exists)
require.NotNil(t, store.Rename(ctx, "/NOT_EXIST.txt", "/NEW_FILE.txt"))
require.Nil(t, store.Rename(ctx, "/hello2.txt", "/hello3.txt"))
require.Nil(t, store.Rename(ctx, "/hello.txt", "/hello3.txt"))
exists, err = store.FileExists(ctx, "/hello.txt")
require.Nil(t, err)
require.False(t, exists)
exists, err = store.FileExists(ctx, "/hello2.txt")
require.Nil(t, err)
require.False(t, exists)
exists, err = store.FileExists(ctx, "/hello3.txt")
require.Nil(t, err)
require.True(t, exists)
}
type iterFileInfo struct {
Name string
Size int64
Content []byte
}
func TestMemStoreWalkDir(t *testing.T) {
store := NewMemStorage()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
allTestFiles := map[string][]byte{
"/hello.txt": []byte("hello world"),
"/hello2.txt": []byte("hello world 2"),
"/aaa/hello.txt": []byte("aaa: hello world"),
"/aaa/world.txt": []byte("aaa: world"),
"/dummy.txt": []byte("dummy"),
}
for fileName, content := range allTestFiles {
require.Nil(t, store.WriteFile(ctx, fileName, content))
}
iterFileInfos := []*iterFileInfo{}
iterFn := func(fileName string, fileSize int64) error {
fileContent, err := store.ReadFile(ctx, fileName)
if err != nil {
return err
}
iterFileInfos = append(iterFileInfos, &iterFileInfo{
Name: fileName,
Size: fileSize,
Content: fileContent,
})
return nil
}
require.Nil(t, store.WalkDir(ctx, &storeapi.WalkOption{}, iterFn))
require.Equal(t, len(allTestFiles), len(iterFileInfos))
for _, info := range iterFileInfos {
expectContent, ok := allTestFiles[info.Name]
require.True(t, ok, info.Name)
require.Equal(t, int64(len(info.Content)), info.Size, info.Name)
require.True(t, bytes.Equal(expectContent, info.Content), info.Name)
}
iterFileInfos = iterFileInfos[:0]
expectFiles := make(map[string][]byte)
expectFiles["/aaa/hello.txt"] = allTestFiles["/aaa/hello.txt"]
expectFiles["/aaa/world.txt"] = allTestFiles["/aaa/world.txt"]
require.Nil(t, store.WalkDir(ctx, &storeapi.WalkOption{
SubDir: "/aaa",
}, iterFn))
require.Equal(t, len(expectFiles), len(iterFileInfos))
for _, info := range iterFileInfos {
expectContent, ok := expectFiles[info.Name]
require.True(t, ok, info.Name)
require.Equal(t, int64(len(info.Content)), info.Size, info.Name)
require.True(t, bytes.Equal(expectContent, info.Content), info.Name)
}
iterFileInfos = iterFileInfos[:0]
expectFiles = make(map[string][]byte)
expectFiles["/hello.txt"] = allTestFiles["/hello.txt"]
expectFiles["/hello2.txt"] = allTestFiles["/hello2.txt"]
expectFiles["/aaa/hello.txt"] = allTestFiles["/aaa/hello.txt"]
require.Nil(t, store.WalkDir(ctx, &storeapi.WalkOption{
ObjPrefix: "hello",
}, iterFn))
require.Equal(t, len(expectFiles), len(iterFileInfos))
for _, info := range iterFileInfos {
expectContent, ok := expectFiles[info.Name]
require.True(t, ok, info.Name)
require.Equal(t, int64(len(info.Content)), info.Size, info.Name)
require.True(t, bytes.Equal(expectContent, info.Content), info.Name)
}
}
func TestMemStoreManipulateBytes(t *testing.T) {
store := NewMemStorage()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testStr := "aaa1"
testBytes := []byte(testStr)
require.Nil(t, store.WriteFile(ctx, "/aaa.txt", testBytes))
testBytes[3] = '2'
require.Equal(t, testStr, string(*store.dataStore["/aaa.txt"].Data.Load()))
readBytes, err := store.ReadFile(ctx, "/aaa.txt")
require.Nil(t, err)
require.Equal(t, testStr, string(readBytes))
readBytes[3] = '2'
require.Equal(t, testStr, string(*store.dataStore["/aaa.txt"].Data.Load()))
}
func TestMemStoreWriteDuringWalkDir(t *testing.T) {
store := NewMemStorage()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var deleteFileName string
allTestFiles := map[string][]byte{
"/hello1.txt": []byte("hello world 1"),
"/hello2.txt": []byte("hello world 2"),
"/hello3.txt": []byte("hello world 3"),
}
var remainFilesMap sync.Map
for fileName, content := range allTestFiles {
require.Nil(t, store.WriteFile(ctx, fileName, content))
remainFilesMap.Store(fileName, true)
}
pendingCh := make(chan struct{})
ch1 := make(chan error)
ch2 := make(chan error)
go func() {
<-pendingCh
ch1 <- store.WalkDir(ctx, nil, func(fileName string, size int64) error {
t.Logf("iterating: %s", fileName)
remainFilesMap.Delete(fileName)
time.Sleep(1 * time.Second)
return nil
})
close(ch1)
}()
go func() {
defer close(ch2)
<-pendingCh
time.Sleep(100 * time.Millisecond) // set some lag, to let 'WalkDir' go-routine run first
err := store.WriteFile(ctx, "/hello4.txt", []byte("hello world4"))
if err != nil {
ch2 <- err
return
}
t.Log("new file written")
remainFilesMap.Range(func(k any, v any) bool {
deleteFileName = k.(string)
return false
})
ch2 <- store.DeleteFile(ctx, deleteFileName)
t.Logf("%s deleted", deleteFileName)
}()
close(pendingCh)
// see how long does the write returns
var err error
select {
case <-time.After(1 * time.Second):
err = errors.New("writing file timeout")
case err = <-ch2:
//continue on
}
require.Nil(t, err)
require.Nil(t, <-ch1)
_, ok := remainFilesMap.Load(deleteFileName)
require.True(t, ok)
}