globalsort: impl onDup for onefile writer (#60903)

ref pingcap/tidb#60621
This commit is contained in:
D3Hunter
2025-04-29 11:54:05 +08:00
committed by GitHub
parent 4ee12aea5a
commit 033c23d9aa
10 changed files with 423 additions and 133 deletions

View File

@ -138,7 +138,7 @@ func writeExternalOneFile(s *writeTestSuite) {
}
writer := builder.BuildOneFile(
s.store, filePath, "writerID")
intest.AssertNoError(writer.Init(ctx, 20*1024*1024))
writer.InitPartSizeAndLogger(ctx, 20*1024*1024)
var minKey, maxKey []byte
key, val, _ := s.source.next()
@ -697,10 +697,9 @@ func TestReadAllData(t *testing.T) {
eg.Go(func() error {
fileName := fmt.Sprintf("/test%d", fileIdx)
writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID")
err := writer.Init(ctx, 5*1024*1024)
require.NoError(t, err)
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
key := []byte(fmt.Sprintf("key0%d", fileIdx))
err = writer.WriteRow(ctx, key, val)
err := writer.WriteRow(ctx, key, val)
require.NoError(t, err)
// write some extra data that is greater than readRangeEnd
@ -720,8 +719,7 @@ func TestReadAllData(t *testing.T) {
eg.Go(func() error {
fileName := fmt.Sprintf("/test%d", fileIdx)
writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID")
err := writer.Init(ctx, 5*1024*1024)
require.NoError(t, err)
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvSize := 0
keyIdx := 0
@ -729,12 +727,12 @@ func TestReadAllData(t *testing.T) {
key := []byte(fmt.Sprintf("key%06d_%d", keyIdx, fileIdx))
keyIdx++
kvSize += len(key) + len(val)
err = writer.WriteRow(ctx, key, val)
err := writer.WriteRow(ctx, key, val)
require.NoError(t, err)
}
// write some extra data that is greater than readRangeEnd
err = writer.WriteRow(ctx, keyAfterRange, val)
err := writer.WriteRow(ctx, keyAfterRange, val)
require.NoError(t, err)
err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 300*1024))
require.NoError(t, err)
@ -749,8 +747,7 @@ func TestReadAllData(t *testing.T) {
eg.Go(func() error {
fileName := fmt.Sprintf("/test%d", fileIdx)
writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID")
err := writer.Init(ctx, 5*1024*1024)
require.NoError(t, err)
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvSize := 0
keyIdx := 0
@ -758,12 +755,12 @@ func TestReadAllData(t *testing.T) {
key := []byte(fmt.Sprintf("key%09d_%d", keyIdx, fileIdx))
keyIdx++
kvSize += len(key) + len(val)
err = writer.WriteRow(ctx, key, val)
err := writer.WriteRow(ctx, key, val)
require.NoError(t, err)
}
// write some extra data that is greater than readRangeEnd
err = writer.WriteRow(ctx, keyAfterRange, val)
err := writer.WriteRow(ctx, keyAfterRange, val)
require.NoError(t, err)
err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 900*1024))
require.NoError(t, err)
@ -776,8 +773,7 @@ func TestReadAllData(t *testing.T) {
for ; fileIdx < 2091; fileIdx++ {
fileName := fmt.Sprintf("/test%d", fileIdx)
writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID")
err := writer.Init(ctx, 5*1024*1024)
require.NoError(t, err)
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvSize := 0
keyIdx := 0
@ -785,12 +781,12 @@ func TestReadAllData(t *testing.T) {
key := []byte(fmt.Sprintf("key%010d_%d", keyIdx, fileIdx))
keyIdx++
kvSize += len(key) + len(val)
err = writer.WriteRow(ctx, key, val)
err := writer.WriteRow(ctx, key, val)
require.NoError(t, err)
}
// write some extra data that is greater than readRangeEnd
err = writer.WriteRow(ctx, keyAfterRange, val)
err := writer.WriteRow(ctx, keyAfterRange, val)
require.NoError(t, err)
err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 900*1024))
require.NoError(t, err)

View File

@ -220,8 +220,7 @@ func TestSwitchMode(t *testing.T) {
SetPropKeysDistance(2).
BuildOneFile(st, "/test", "0")
err := writer.Init(ctx, 5*1024*1024)
require.NoError(t, err)
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvCnt := 1000000
kvs := make([]common.KvPair, kvCnt)
@ -241,7 +240,7 @@ func TestSwitchMode(t *testing.T) {
require.NoError(t, err)
}
err = writer.Close(ctx)
err := writer.Close(ctx)
require.NoError(t, err)
pool := membuf.NewPool()
ConcurrentReaderBufferSizePerConc = rand.Intn(100) + 1

View File

@ -71,6 +71,13 @@ func (s *KeyValueStore) addEncodedData(data []byte) error {
return nil
}
func (s *KeyValueStore) addRawKV(key, val []byte) error {
length := len(key) + len(val) + lengthBytes*2
buf := make([]byte, length)
encodeToBuf(buf, key, val)
return s.addEncodedData(buf[:length])
}
// finish closes the KeyValueStore and append the last range property.
func (s *KeyValueStore) finish() {
if s.rc != nil {

View File

@ -165,10 +165,7 @@ func mergeOverlappingFilesInternal(
SetBlockSize(blockSize).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
if err != nil {
return nil
}
writer.InitPartSizeAndLogger(ctx, partSize)
defer func() {
err2 := writer.Close(ctx)
if err2 == nil {

View File

@ -104,11 +104,7 @@ func MergeOverlappingFilesV2(
}
}()
err = writer.Init(ctx, partSize)
if err != nil {
logutil.Logger(ctx).Warn("init writer failed", zap.Error(err))
return
}
writer.InitPartSizeAndLogger(ctx, partSize)
bufPool := membuf.NewPool()
loaded := &memKVsAndBuffers{}

View File

@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
@ -58,54 +59,158 @@ type OneFileWriter struct {
onClose OnCloseFunc
closed bool
onDup engineapi.OnDuplicateKey
// for duplicate detection.
onDup engineapi.OnDuplicateKey
pivotKey []byte
pivotValue []byte
// number of key that duplicate with pivotKey, include pivotKey itself, so it
// always >= 1 after pivotKey is set.
currDupCnt int
// below fields are only used when onDup is OnDuplicateKeyRecord.
recordedDupCnt int
dupFile string
dupWriter storage.ExternalFileWriter
dupKVStore *KeyValueStore
minKey []byte
maxKey []byte
logger *zap.Logger
logger *zap.Logger
partSize int64
}
// initWriter inits the underlying dataFile/statFile path, dataWriter/statWriter for OneFileWriter.
func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) (
err error,
) {
w.dataFile = filepath.Join(w.filenamePrefix, "one-file")
w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{
// lazyInitWriter inits the underlying dataFile/statFile path, dataWriter/statWriter
// for OneFileWriter lazily, as when OnDup=remove, the target file might be empty.
func (w *OneFileWriter) lazyInitWriter(ctx context.Context) (err error) {
if w.dataWriter != nil {
return nil
}
dataFile := filepath.Join(w.filenamePrefix, "one-file")
dataWriter, err := w.store.Create(ctx, dataFile, &storage.WriterOption{
Concurrency: maxUploadWorkersPerThread,
PartSize: partSize})
PartSize: w.partSize})
if err != nil {
return err
}
w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file")
w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{
statFile := filepath.Join(w.filenamePrefix+statSuffix, "one-file")
statWriter, err := w.store.Create(ctx, statFile, &storage.WriterOption{
Concurrency: maxUploadWorkersPerThread,
PartSize: MinUploadPartSize})
if err != nil {
w.logger.Info("create stat writer failed", zap.Error(err))
_ = w.dataWriter.Close(ctx)
_ = dataWriter.Close(ctx)
return err
}
w.logger.Info("one file writer", zap.String("data-file", w.dataFile), zap.String("stat-file", w.statFile))
return nil
}
w.logger.Info("one file writer", zap.String("data-file", dataFile),
zap.String("stat-file", statFile), zap.Stringer("on-dup", w.onDup))
// Init inits the OneFileWriter and its underlying KeyValueStore.
func (w *OneFileWriter) Init(ctx context.Context, partSize int64) (err error) {
w.logger = logutil.Logger(ctx)
err = w.initWriter(ctx, partSize)
if err != nil {
return err
}
w.dataFile, w.dataWriter = dataFile, dataWriter
w.statFile, w.statWriter = statFile, statWriter
w.kvStore = NewKeyValueStore(ctx, w.dataWriter, w.rc)
return nil
}
func (w *OneFileWriter) lazyInitDupFile(ctx context.Context) error {
if w.dupWriter != nil {
return nil
}
dupFile := filepath.Join(w.filenamePrefix+dupSuffix, "one-file")
dupWriter, err := w.store.Create(ctx, dupFile, &storage.WriterOption{
// too many duplicates will cause duplicate resolution part very slow,
// we temporarily use 1 as we don't expect too many duplicates, if there
// are, it will be slow anyway.
// we also need to consider memory usage if we want to increase it later.
Concurrency: 1,
PartSize: w.partSize})
if err != nil {
w.logger.Info("create dup writer failed", zap.Error(err))
return err
}
w.dupFile = dupFile
w.dupWriter = dupWriter
w.dupKVStore = NewKeyValueStore(ctx, w.dupWriter, nil)
return nil
}
// InitPartSizeAndLogger inits the OneFileWriter and its underlying KeyValueStore.
func (w *OneFileWriter) InitPartSizeAndLogger(ctx context.Context, partSize int64) {
w.logger = logutil.Logger(ctx)
w.partSize = partSize
}
// WriteRow implements ingest.Writer.
func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) error {
if w.onDup != engineapi.OnDuplicateKeyIgnore {
// must be Record or Remove right now
return w.handleDupAndWrite(ctx, idxKey, idxVal)
}
return w.doWriteRow(ctx, idxKey, idxVal)
}
func (w *OneFileWriter) handleDupAndWrite(ctx context.Context, idxKey, idxVal []byte) error {
if w.currDupCnt == 0 {
return w.onNextPivot(ctx, idxKey, idxVal)
}
if slices.Compare(w.pivotKey, idxKey) == 0 {
w.currDupCnt++
if w.onDup == engineapi.OnDuplicateKeyRecord {
// record first 2 duplicate to data file, others to dup file.
if w.currDupCnt == 2 {
if err := w.doWriteRow(ctx, w.pivotKey, w.pivotValue); err != nil {
return err
}
if err := w.doWriteRow(ctx, idxKey, idxVal); err != nil {
return err
}
} else {
// w.currDupCnt > 2
if err := w.lazyInitDupFile(ctx); err != nil {
return err
}
if err := w.dupKVStore.addRawKV(idxKey, idxVal); err != nil {
return err
}
w.recordedDupCnt++
}
}
} else {
return w.onNextPivot(ctx, idxKey, idxVal)
}
return nil
}
func (w *OneFileWriter) onNextPivot(ctx context.Context, idxKey, idxVal []byte) error {
if w.currDupCnt == 1 {
// last pivot has no duplicate.
if err := w.doWriteRow(ctx, w.pivotKey, w.pivotValue); err != nil {
return err
}
}
if idxKey != nil {
w.pivotKey = slices.Clone(idxKey)
w.pivotValue = slices.Clone(idxVal)
w.currDupCnt = 1
} else {
w.pivotKey, w.pivotValue = nil, nil
w.currDupCnt = 0
}
return nil
}
func (w *OneFileWriter) handlePivotOnClose(ctx context.Context) error {
return w.onNextPivot(ctx, nil, nil)
}
func (w *OneFileWriter) doWriteRow(ctx context.Context, idxKey, idxVal []byte) error {
if w.minKey == nil {
w.minKey = slices.Clone(idxKey)
}
if err := w.lazyInitWriter(ctx); err != nil {
return err
}
// 1. encode data and write to kvStore.
keyLen := len(idxKey)
length := len(idxKey) + len(idxVal) + lengthBytes*2
@ -128,11 +233,8 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err
// the new prop should have the same offset with kvStore.
w.rc.currProp.offset = w.kvStore.offset
}
binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen))
binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal)))
copy(buf[lengthBytes*2:], idxKey)
encodeToBuf(buf, idxKey, idxVal)
w.maxKey = buf[lengthBytes*2 : lengthBytes*2+keyLen]
copy(buf[lengthBytes*2+keyLen:], idxVal)
err := w.kvStore.addEncodedData(buf[:length])
if err != nil {
return err
@ -151,22 +253,33 @@ func (w *OneFileWriter) Close(ctx context.Context) error {
if err != nil {
return err
}
w.logger.Info("close one file writer",
zap.String("writerID", w.writerID))
w.logger.Info("close one file writer", zap.String("writerID", w.writerID))
maxKey := slices.Clone(w.maxKey)
var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{w.dataFile, w.statFile})
stat.build([]tidbkv.Key{w.minKey}, []tidbkv.Key{maxKey})
var minKey, maxKey []byte
mStats := make([]MultipleFilesStat, 0, 1)
if w.totalCnt > 0 {
// it's possible that all KV pairs are duplicates and removed.
minKey = w.minKey
maxKey = slices.Clone(w.maxKey)
var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames, [2]string{w.dataFile, w.statFile})
stat.build([]tidbkv.Key{w.minKey}, []tidbkv.Key{maxKey})
mStats = append(mStats, stat)
}
conflictInfo := engineapi.ConflictInfo{}
if w.recordedDupCnt > 0 {
conflictInfo.Count = uint64(w.recordedDupCnt)
conflictInfo.Files = []string{w.dupFile}
}
w.onClose(&WriterSummary{
WriterID: w.writerID,
Seq: 0,
Min: w.minKey,
Min: minKey,
Max: maxKey,
TotalSize: w.totalSize,
TotalCnt: w.totalCnt,
MultipleFilesStats: []MultipleFilesStat{stat},
MultipleFilesStats: mStats,
ConflictInfo: conflictInfo,
})
w.totalCnt = 0
w.totalSize = 0
@ -175,27 +288,50 @@ func (w *OneFileWriter) Close(ctx context.Context) error {
}
func (w *OneFileWriter) closeImpl(ctx context.Context) (err error) {
// 1. write remaining statistic.
w.kvStore.finish()
encodedStat := w.rc.encode()
_, err = w.statWriter.Write(ctx, encodedStat)
if err != nil {
return err
}
w.rc.reset()
// 2. close data writer.
err1 := w.dataWriter.Close(ctx)
if err1 != nil {
err = err1
w.logger.Error("Close data writer failed", zap.Error(err))
if err = w.handlePivotOnClose(ctx); err != nil {
return
}
// 3. close stat writer.
err2 := w.statWriter.Close(ctx)
if err2 != nil {
err = err2
w.logger.Error("Close stat writer failed", zap.Error(err))
return
if w.dataWriter != nil {
// 1. write remaining statistic.
w.kvStore.finish()
encodedStat := w.rc.encode()
_, err = w.statWriter.Write(ctx, encodedStat)
if err != nil {
return err
}
w.rc.reset()
// 2. close data writer.
err1 := w.dataWriter.Close(ctx)
if err1 != nil {
err = err1
w.logger.Error("Close data writer failed", zap.Error(err))
return
}
// 3. close stat writer.
err2 := w.statWriter.Close(ctx)
if err2 != nil {
err = err2
w.logger.Error("Close stat writer failed", zap.Error(err))
return
}
}
if w.dupWriter != nil {
w.dupKVStore.finish()
if err3 := w.dupWriter.Close(ctx); err3 != nil {
err = err3
w.logger.Error("Close dup writer failed", zap.Error(err))
return
}
}
return nil
}
// caller should make sure the buf is large enough to hold the encoded data.
func encodeToBuf(buf, key, value []byte) {
intest.Assert(len(buf) == lengthBytes*2+len(key)+len(value))
keyLen := len(key)
binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen))
binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(value)))
copy(buf[lengthBytes*2:], key)
copy(buf[lengthBytes*2+keyLen:], value)
}

View File

@ -51,7 +51,7 @@ func TestOnefileWriterBasic(t *testing.T) {
SetPropKeysDistance(2).
BuildOneFile(memStore, "/test", "0")
require.NoError(t, writer.Init(ctx, 5*1024*1024))
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvCnt := 100
kvs := make([]common.KvPair, kvCnt)
@ -122,7 +122,7 @@ func checkOneFileWriterStatWithDistance(t *testing.T, kvCnt int, keysDistance ui
SetPropKeysDistance(keysDistance).
BuildOneFile(memStore, "/"+prefix, "0")
require.NoError(t, writer.Init(ctx, 5*1024*1024))
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvs := make([]common.KvPair, 0, kvCnt)
for i := 0; i < kvCnt; i++ {
kvs = append(kvs, common.KvPair{
@ -267,7 +267,7 @@ func TestOnefileWriterManyRows(t *testing.T) {
SetMemorySizeLimit(1000).
BuildOneFile(memStore, "/test", "0")
require.NoError(t, writer.Init(ctx, 5*1024*1024))
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvCnt := 100000
expectedTotalSize := 0
@ -366,7 +366,7 @@ func TestOnefilePropOffset(t *testing.T) {
SetMemorySizeLimit(uint64(memSizeLimit)).
BuildOneFile(memStore, "/test", "0")
require.NoError(t, writer.Init(ctx, 5*1024*1024))
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
kvCnt := 10000
kvs := make([]common.KvPair, kvCnt)
@ -399,3 +399,21 @@ func TestOnefilePropOffset(t *testing.T) {
lastOffset = prop.offset
}
}
type testOneFileWriter struct {
*OneFileWriter
}
func (w *testOneFileWriter) WriteRow(ctx context.Context, key, val []byte, _ dbkv.Handle) error {
return w.OneFileWriter.WriteRow(ctx, key, val)
}
func TestOnefileWriterOnDup(t *testing.T) {
getWriterFn := func(store storage.ExternalStorage, b *WriterBuilder) testWriter {
writer := b.BuildOneFile(store, "/onefile", "0")
writer.InitPartSizeAndLogger(context.Background(), 1024)
return &testOneFileWriter{OneFileWriter: writer}
}
doTestWriterOnDupRecord(t, true, getWriterFn)
doTestWriterOnDupRemove(t, true, getWriterFn)
}

View File

@ -85,7 +85,7 @@ func TestReadAllOneFile(t *testing.T) {
SetMemorySizeLimit(uint64(memSizeLimit)).
BuildOneFile(memStore, "/test", "0")
require.NoError(t, w.Init(ctx, int64(5*size.MB)))
w.InitPartSizeAndLogger(ctx, int64(5*size.MB))
kvCnt := rand.Intn(10) + 10000
kvs := make([]common.KvPair, kvCnt)
@ -123,7 +123,7 @@ func TestReadLargeFile(t *testing.T) {
SetPropKeysDistance(1000).
BuildOneFile(memStore, "/test", "0")
require.NoError(t, w.Init(ctx, int64(5*size.MB)))
w.InitPartSizeAndLogger(ctx, int64(5*size.MB))
val := make([]byte, 10000)
for i := 0; i < 10000; i++ {

View File

@ -414,10 +414,9 @@ func Test3KFilesRangeSplitter(t *testing.T) {
SetPropSizeDistance(size.MB).
SetOnCloseFunc(onClose).
BuildOneFile(store, "/mock-test", uuid.New().String())
err := w.Init(ctx, int64(5*size.MB))
require.NoError(t, err)
w.InitPartSizeAndLogger(ctx, int64(5*size.MB))
// we don't need data files
err = w.dataWriter.Close(ctx)
err := w.dataWriter.Close(ctx)
require.NoError(t, err)
w.dataWriter = storage.NoopWriter{}

View File

@ -22,6 +22,7 @@ import (
"io"
"path"
"slices"
"sort"
"strconv"
"strings"
"testing"
@ -606,7 +607,7 @@ func TestGetAdjustedIndexBlockSize(t *testing.T) {
require.EqualValues(t, 16*units.MiB, GetAdjustedBlockSize(166*units.MiB))
}
func readKVFile(t *testing.T, store *writerFirstCloseFailStorage, filename string) []kvPair {
func readKVFile(t *testing.T, store storage.ExternalStorage, filename string) []kvPair {
t.Helper()
reader, err := newKVReader(context.Background(), filename, store, 0, units.KiB)
require.NoError(t, err)
@ -622,15 +623,44 @@ func readKVFile(t *testing.T, store *writerFirstCloseFailStorage, filename strin
return kvs
}
func TestWriterOnDupRecord(t *testing.T) {
ctx := context.Background()
store := &writerFirstCloseFailStorage{ExternalStorage: storage.NewMemStorage(), shouldFail: true}
var summary *WriterSummary
type testWriter interface {
WriteRow(ctx context.Context, key, val []byte, handle dbkv.Handle) error
Close(ctx context.Context) error
}
t.Run("all duplicated, flush once", func(t *testing.T) {
writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240).
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRecord).
Build(store, "/test", "0")
func TestWriterOnDup(t *testing.T) {
getWriterFn := func(store storage.ExternalStorage, b *WriterBuilder) testWriter {
return b.Build(store, "/test", "0")
}
doTestWriterOnDupRecord(t, false, getWriterFn)
doTestWriterOnDupRemove(t, false, getWriterFn)
}
func doTestWriterOnDupRecord(t *testing.T, testingOneFile bool, getWriter func(store storage.ExternalStorage, b *WriterBuilder) testWriter) {
t.Helper()
ctx := context.Background()
store := storage.NewMemStorage()
var summary *WriterSummary
doGetWriter := func(store storage.ExternalStorage, builder *WriterBuilder) testWriter {
builder = builder.SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRecord)
return getWriter(store, builder)
}
t.Run("write nothing", func(t *testing.T) {
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
require.NoError(t, writer.Close(ctx))
require.Empty(t, summary.Min)
require.Empty(t, summary.Max)
require.Zero(t, summary.TotalCnt)
require.Zero(t, summary.TotalSize)
require.Zero(t, summary.ConflictInfo.Count)
require.Empty(t, summary.ConflictInfo.Files)
})
t.Run("all duplicated", func(t *testing.T) {
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
for range 5 {
require.NoError(t, writer.WriteRow(ctx, []byte("1111"), []byte("vvvv"), nil))
}
@ -649,12 +679,50 @@ func TestWriterOnDupRecord(t *testing.T) {
}
})
t.Run("with different duplicated kv, flush twice", func(t *testing.T) {
t.Run("with different duplicated kv, first kv not duplicated", func(t *testing.T) {
// each KV will take 24 bytes, so we flush every 10 KVs
writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240).
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRecord).
Build(store, "/test", "0")
for _, p := range []struct {
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
input := []struct {
pair *kvPair
cnt int
}{
{pair: &kvPair{key: []byte("2222"), value: []byte("vvvv")}, cnt: 1},
{pair: &kvPair{key: []byte("1111"), value: []byte("vvvv")}, cnt: 1},
{pair: &kvPair{key: []byte("6666"), value: []byte("vvvv")}, cnt: 3},
{pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 5},
}
if testingOneFile {
sort.Slice(input, func(i, j int) bool {
return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0
})
}
for _, p := range input {
for i := 0; i < p.cnt; i++ {
require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil))
}
}
require.NoError(t, writer.Close(ctx))
require.EqualValues(t, []byte("1111"), summary.Min)
require.EqualValues(t, []byte("7777"), summary.Max)
require.EqualValues(t, 6, summary.TotalCnt)
require.EqualValues(t, 48, summary.TotalSize)
require.EqualValues(t, 4, summary.ConflictInfo.Count)
require.Len(t, summary.ConflictInfo.Files, 1)
kvs := readKVFile(t, store, summary.ConflictInfo.Files[0])
require.EqualValues(t, []kvPair{
{key: []byte("6666"), value: []byte("vvvv")},
{key: []byte("7777"), value: []byte("vvvv")},
{key: []byte("7777"), value: []byte("vvvv")},
{key: []byte("7777"), value: []byte("vvvv")},
}, kvs)
})
t.Run("with different duplicated kv, first kv duplicated", func(t *testing.T) {
// each KV will take 24 bytes, so we flush every 10 KVs
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
input := []struct {
pair *kvPair
cnt int
}{
@ -665,7 +733,13 @@ func TestWriterOnDupRecord(t *testing.T) {
{pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 1},
{pair: &kvPair{key: []byte("4444"), value: []byte("vvvv")}, cnt: 4},
{pair: &kvPair{key: []byte("3333"), value: []byte("vvvv")}, cnt: 4},
} {
}
if testingOneFile {
sort.Slice(input, func(i, j int) bool {
return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0
})
}
for _, p := range input {
for i := 0; i < p.cnt; i++ {
require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil))
}
@ -676,33 +750,64 @@ func TestWriterOnDupRecord(t *testing.T) {
require.EqualValues(t, 12, summary.TotalCnt)
require.EqualValues(t, 96, summary.TotalSize)
require.EqualValues(t, 8, summary.ConflictInfo.Count)
require.Len(t, summary.ConflictInfo.Files, 2)
kvs := readKVFile(t, store, summary.ConflictInfo.Files[0])
require.EqualValues(t, []kvPair{
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("2222"), value: []byte("vvvv")},
}, kvs)
kvs = readKVFile(t, store, summary.ConflictInfo.Files[1])
require.EqualValues(t, []kvPair{
{key: []byte("3333"), value: []byte("vvvv")},
{key: []byte("3333"), value: []byte("vvvv")},
{key: []byte("4444"), value: []byte("vvvv")},
{key: []byte("4444"), value: []byte("vvvv")},
}, kvs)
if testingOneFile {
require.Len(t, summary.ConflictInfo.Files, 1)
kvs := readKVFile(t, store, summary.ConflictInfo.Files[0])
require.EqualValues(t, []kvPair{
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("2222"), value: []byte("vvvv")},
{key: []byte("3333"), value: []byte("vvvv")},
{key: []byte("3333"), value: []byte("vvvv")},
{key: []byte("4444"), value: []byte("vvvv")},
{key: []byte("4444"), value: []byte("vvvv")},
}, kvs)
} else {
require.Len(t, summary.ConflictInfo.Files, 2)
kvs := readKVFile(t, store, summary.ConflictInfo.Files[0])
require.EqualValues(t, []kvPair{
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("1111"), value: []byte("vvvv")},
{key: []byte("2222"), value: []byte("vvvv")},
}, kvs)
kvs = readKVFile(t, store, summary.ConflictInfo.Files[1])
require.EqualValues(t, []kvPair{
{key: []byte("3333"), value: []byte("vvvv")},
{key: []byte("3333"), value: []byte("vvvv")},
{key: []byte("4444"), value: []byte("vvvv")},
{key: []byte("4444"), value: []byte("vvvv")},
}, kvs)
}
})
}
func TestWriterOnDupRemove(t *testing.T) {
func doTestWriterOnDupRemove(t *testing.T, testingOneFile bool, getWriter func(storage.ExternalStorage, *WriterBuilder) testWriter) {
t.Helper()
ctx := context.Background()
store := &writerFirstCloseFailStorage{ExternalStorage: storage.NewMemStorage(), shouldFail: true}
store := storage.NewMemStorage()
var summary *WriterSummary
doGetWriter := func(store storage.ExternalStorage, builder *WriterBuilder) testWriter {
builder = builder.SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRemove)
return getWriter(store, builder)
}
t.Run("all duplicated, flush once", func(t *testing.T) {
writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240).
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRemove).
Build(store, "/test", "0")
t.Run("write nothing", func(t *testing.T) {
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
require.NoError(t, writer.Close(ctx))
require.Empty(t, summary.Min)
require.Empty(t, summary.Max)
require.Zero(t, summary.TotalCnt)
require.Zero(t, summary.TotalSize)
require.Zero(t, summary.ConflictInfo.Count)
require.Empty(t, summary.ConflictInfo.Files)
})
t.Run("all duplicated", func(t *testing.T) {
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
for range 5 {
require.NoError(t, writer.WriteRow(ctx, []byte("1111"), []byte("vvvv"), nil))
}
@ -716,12 +821,43 @@ func TestWriterOnDupRemove(t *testing.T) {
require.Empty(t, summary.ConflictInfo.Files)
})
t.Run("with different duplicated kv, flush twice", func(t *testing.T) {
t.Run("with different duplicated kv, first kv not duplicated", func(t *testing.T) {
// each KV will take 24 bytes, so we flush every 10 KVs
writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240).
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRemove).
Build(store, "/test", "0")
for _, p := range []struct {
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
input := []struct {
pair *kvPair
cnt int
}{
{pair: &kvPair{key: []byte("2222"), value: []byte("vvvv")}, cnt: 1},
{pair: &kvPair{key: []byte("1111"), value: []byte("vvvv")}, cnt: 1},
{pair: &kvPair{key: []byte("6666"), value: []byte("vvvv")}, cnt: 3},
{pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 5},
}
if testingOneFile {
sort.Slice(input, func(i, j int) bool {
return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0
})
}
for _, p := range input {
for i := 0; i < p.cnt; i++ {
require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil))
}
}
require.NoError(t, writer.Close(ctx))
require.EqualValues(t, []byte("1111"), summary.Min)
require.EqualValues(t, []byte("2222"), summary.Max)
require.EqualValues(t, 2, summary.TotalCnt)
require.EqualValues(t, 16, summary.TotalSize)
require.EqualValues(t, 0, summary.ConflictInfo.Count)
require.Empty(t, summary.ConflictInfo.Files)
})
t.Run("with different duplicated kv, first kv duplicated", func(t *testing.T) {
// each KV will take 24 bytes, so we flush every 10 KVs
builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240)
writer := doGetWriter(store, builder)
input := []struct {
pair *kvPair
cnt int
}{
@ -732,7 +868,13 @@ func TestWriterOnDupRemove(t *testing.T) {
{pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 1},
{pair: &kvPair{key: []byte("4444"), value: []byte("vvvv")}, cnt: 4},
{pair: &kvPair{key: []byte("3333"), value: []byte("vvvv")}, cnt: 4},
} {
}
if testingOneFile {
sort.Slice(input, func(i, j int) bool {
return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0
})
}
for _, p := range input {
for i := 0; i < p.cnt; i++ {
require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil))
}