diff --git a/pkg/lightning/backend/external/bench_test.go b/pkg/lightning/backend/external/bench_test.go index cb799452de..542f02e482 100644 --- a/pkg/lightning/backend/external/bench_test.go +++ b/pkg/lightning/backend/external/bench_test.go @@ -138,7 +138,7 @@ func writeExternalOneFile(s *writeTestSuite) { } writer := builder.BuildOneFile( s.store, filePath, "writerID") - intest.AssertNoError(writer.Init(ctx, 20*1024*1024)) + writer.InitPartSizeAndLogger(ctx, 20*1024*1024) var minKey, maxKey []byte key, val, _ := s.source.next() @@ -697,10 +697,9 @@ func TestReadAllData(t *testing.T) { eg.Go(func() error { fileName := fmt.Sprintf("/test%d", fileIdx) writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") - err := writer.Init(ctx, 5*1024*1024) - require.NoError(t, err) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) key := []byte(fmt.Sprintf("key0%d", fileIdx)) - err = writer.WriteRow(ctx, key, val) + err := writer.WriteRow(ctx, key, val) require.NoError(t, err) // write some extra data that is greater than readRangeEnd @@ -720,8 +719,7 @@ func TestReadAllData(t *testing.T) { eg.Go(func() error { fileName := fmt.Sprintf("/test%d", fileIdx) writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") - err := writer.Init(ctx, 5*1024*1024) - require.NoError(t, err) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvSize := 0 keyIdx := 0 @@ -729,12 +727,12 @@ func TestReadAllData(t *testing.T) { key := []byte(fmt.Sprintf("key%06d_%d", keyIdx, fileIdx)) keyIdx++ kvSize += len(key) + len(val) - err = writer.WriteRow(ctx, key, val) + err := writer.WriteRow(ctx, key, val) require.NoError(t, err) } // write some extra data that is greater than readRangeEnd - err = writer.WriteRow(ctx, keyAfterRange, val) + err := writer.WriteRow(ctx, keyAfterRange, val) require.NoError(t, err) err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 300*1024)) require.NoError(t, err) @@ -749,8 +747,7 @@ func TestReadAllData(t *testing.T) { eg.Go(func() error { fileName := fmt.Sprintf("/test%d", fileIdx) writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") - err := writer.Init(ctx, 5*1024*1024) - require.NoError(t, err) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvSize := 0 keyIdx := 0 @@ -758,12 +755,12 @@ func TestReadAllData(t *testing.T) { key := []byte(fmt.Sprintf("key%09d_%d", keyIdx, fileIdx)) keyIdx++ kvSize += len(key) + len(val) - err = writer.WriteRow(ctx, key, val) + err := writer.WriteRow(ctx, key, val) require.NoError(t, err) } // write some extra data that is greater than readRangeEnd - err = writer.WriteRow(ctx, keyAfterRange, val) + err := writer.WriteRow(ctx, keyAfterRange, val) require.NoError(t, err) err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 900*1024)) require.NoError(t, err) @@ -776,8 +773,7 @@ func TestReadAllData(t *testing.T) { for ; fileIdx < 2091; fileIdx++ { fileName := fmt.Sprintf("/test%d", fileIdx) writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") - err := writer.Init(ctx, 5*1024*1024) - require.NoError(t, err) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvSize := 0 keyIdx := 0 @@ -785,12 +781,12 @@ func TestReadAllData(t *testing.T) { key := []byte(fmt.Sprintf("key%010d_%d", keyIdx, fileIdx)) keyIdx++ kvSize += len(key) + len(val) - err = writer.WriteRow(ctx, key, val) + err := writer.WriteRow(ctx, key, val) require.NoError(t, err) } // write some extra data that is greater than readRangeEnd - err = writer.WriteRow(ctx, keyAfterRange, val) + err := writer.WriteRow(ctx, keyAfterRange, val) require.NoError(t, err) err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 900*1024)) require.NoError(t, err) diff --git a/pkg/lightning/backend/external/byte_reader_test.go b/pkg/lightning/backend/external/byte_reader_test.go index a5d15aca5a..157bbcc07f 100644 --- a/pkg/lightning/backend/external/byte_reader_test.go +++ b/pkg/lightning/backend/external/byte_reader_test.go @@ -220,8 +220,7 @@ func TestSwitchMode(t *testing.T) { SetPropKeysDistance(2). BuildOneFile(st, "/test", "0") - err := writer.Init(ctx, 5*1024*1024) - require.NoError(t, err) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvCnt := 1000000 kvs := make([]common.KvPair, kvCnt) @@ -241,7 +240,7 @@ func TestSwitchMode(t *testing.T) { require.NoError(t, err) } - err = writer.Close(ctx) + err := writer.Close(ctx) require.NoError(t, err) pool := membuf.NewPool() ConcurrentReaderBufferSizePerConc = rand.Intn(100) + 1 diff --git a/pkg/lightning/backend/external/file.go b/pkg/lightning/backend/external/file.go index ab1dd804f1..2070cc8efd 100644 --- a/pkg/lightning/backend/external/file.go +++ b/pkg/lightning/backend/external/file.go @@ -71,6 +71,13 @@ func (s *KeyValueStore) addEncodedData(data []byte) error { return nil } +func (s *KeyValueStore) addRawKV(key, val []byte) error { + length := len(key) + len(val) + lengthBytes*2 + buf := make([]byte, length) + encodeToBuf(buf, key, val) + return s.addEncodedData(buf[:length]) +} + // finish closes the KeyValueStore and append the last range property. func (s *KeyValueStore) finish() { if s.rc != nil { diff --git a/pkg/lightning/backend/external/merge.go b/pkg/lightning/backend/external/merge.go index 503b037d54..716111dc2b 100644 --- a/pkg/lightning/backend/external/merge.go +++ b/pkg/lightning/backend/external/merge.go @@ -165,10 +165,7 @@ func mergeOverlappingFilesInternal( SetBlockSize(blockSize). SetOnCloseFunc(onClose). BuildOneFile(store, newFilePrefix, writerID) - err = writer.Init(ctx, partSize) - if err != nil { - return nil - } + writer.InitPartSizeAndLogger(ctx, partSize) defer func() { err2 := writer.Close(ctx) if err2 == nil { diff --git a/pkg/lightning/backend/external/merge_v2.go b/pkg/lightning/backend/external/merge_v2.go index 785aba4e93..418c30520b 100644 --- a/pkg/lightning/backend/external/merge_v2.go +++ b/pkg/lightning/backend/external/merge_v2.go @@ -104,11 +104,7 @@ func MergeOverlappingFilesV2( } }() - err = writer.Init(ctx, partSize) - if err != nil { - logutil.Logger(ctx).Warn("init writer failed", zap.Error(err)) - return - } + writer.InitPartSizeAndLogger(ctx, partSize) bufPool := membuf.NewPool() loaded := &memKVsAndBuffers{} diff --git a/pkg/lightning/backend/external/onefile_writer.go b/pkg/lightning/backend/external/onefile_writer.go index 0f5557d233..f22b7d099a 100644 --- a/pkg/lightning/backend/external/onefile_writer.go +++ b/pkg/lightning/backend/external/onefile_writer.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/ingestor/engineapi" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/membuf" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) @@ -58,54 +59,158 @@ type OneFileWriter struct { onClose OnCloseFunc closed bool - onDup engineapi.OnDuplicateKey + + // for duplicate detection. + onDup engineapi.OnDuplicateKey + pivotKey []byte + pivotValue []byte + // number of key that duplicate with pivotKey, include pivotKey itself, so it + // always >= 1 after pivotKey is set. + currDupCnt int + // below fields are only used when onDup is OnDuplicateKeyRecord. + recordedDupCnt int + dupFile string + dupWriter storage.ExternalFileWriter + dupKVStore *KeyValueStore minKey []byte maxKey []byte - logger *zap.Logger + logger *zap.Logger + partSize int64 } -// initWriter inits the underlying dataFile/statFile path, dataWriter/statWriter for OneFileWriter. -func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) ( - err error, -) { - w.dataFile = filepath.Join(w.filenamePrefix, "one-file") - w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{ +// lazyInitWriter inits the underlying dataFile/statFile path, dataWriter/statWriter +// for OneFileWriter lazily, as when OnDup=remove, the target file might be empty. +func (w *OneFileWriter) lazyInitWriter(ctx context.Context) (err error) { + if w.dataWriter != nil { + return nil + } + + dataFile := filepath.Join(w.filenamePrefix, "one-file") + dataWriter, err := w.store.Create(ctx, dataFile, &storage.WriterOption{ Concurrency: maxUploadWorkersPerThread, - PartSize: partSize}) + PartSize: w.partSize}) if err != nil { return err } - w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file") - w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{ + statFile := filepath.Join(w.filenamePrefix+statSuffix, "one-file") + statWriter, err := w.store.Create(ctx, statFile, &storage.WriterOption{ Concurrency: maxUploadWorkersPerThread, PartSize: MinUploadPartSize}) if err != nil { w.logger.Info("create stat writer failed", zap.Error(err)) - _ = w.dataWriter.Close(ctx) + _ = dataWriter.Close(ctx) return err } - w.logger.Info("one file writer", zap.String("data-file", w.dataFile), zap.String("stat-file", w.statFile)) - return nil -} + w.logger.Info("one file writer", zap.String("data-file", dataFile), + zap.String("stat-file", statFile), zap.Stringer("on-dup", w.onDup)) -// Init inits the OneFileWriter and its underlying KeyValueStore. -func (w *OneFileWriter) Init(ctx context.Context, partSize int64) (err error) { - w.logger = logutil.Logger(ctx) - err = w.initWriter(ctx, partSize) - if err != nil { - return err - } + w.dataFile, w.dataWriter = dataFile, dataWriter + w.statFile, w.statWriter = statFile, statWriter w.kvStore = NewKeyValueStore(ctx, w.dataWriter, w.rc) return nil } +func (w *OneFileWriter) lazyInitDupFile(ctx context.Context) error { + if w.dupWriter != nil { + return nil + } + + dupFile := filepath.Join(w.filenamePrefix+dupSuffix, "one-file") + dupWriter, err := w.store.Create(ctx, dupFile, &storage.WriterOption{ + // too many duplicates will cause duplicate resolution part very slow, + // we temporarily use 1 as we don't expect too many duplicates, if there + // are, it will be slow anyway. + // we also need to consider memory usage if we want to increase it later. + Concurrency: 1, + PartSize: w.partSize}) + if err != nil { + w.logger.Info("create dup writer failed", zap.Error(err)) + return err + } + w.dupFile = dupFile + w.dupWriter = dupWriter + w.dupKVStore = NewKeyValueStore(ctx, w.dupWriter, nil) + return nil +} + +// InitPartSizeAndLogger inits the OneFileWriter and its underlying KeyValueStore. +func (w *OneFileWriter) InitPartSizeAndLogger(ctx context.Context, partSize int64) { + w.logger = logutil.Logger(ctx) + w.partSize = partSize +} + // WriteRow implements ingest.Writer. func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) error { + if w.onDup != engineapi.OnDuplicateKeyIgnore { + // must be Record or Remove right now + return w.handleDupAndWrite(ctx, idxKey, idxVal) + } + return w.doWriteRow(ctx, idxKey, idxVal) +} + +func (w *OneFileWriter) handleDupAndWrite(ctx context.Context, idxKey, idxVal []byte) error { + if w.currDupCnt == 0 { + return w.onNextPivot(ctx, idxKey, idxVal) + } + if slices.Compare(w.pivotKey, idxKey) == 0 { + w.currDupCnt++ + if w.onDup == engineapi.OnDuplicateKeyRecord { + // record first 2 duplicate to data file, others to dup file. + if w.currDupCnt == 2 { + if err := w.doWriteRow(ctx, w.pivotKey, w.pivotValue); err != nil { + return err + } + if err := w.doWriteRow(ctx, idxKey, idxVal); err != nil { + return err + } + } else { + // w.currDupCnt > 2 + if err := w.lazyInitDupFile(ctx); err != nil { + return err + } + if err := w.dupKVStore.addRawKV(idxKey, idxVal); err != nil { + return err + } + w.recordedDupCnt++ + } + } + } else { + return w.onNextPivot(ctx, idxKey, idxVal) + } + return nil +} + +func (w *OneFileWriter) onNextPivot(ctx context.Context, idxKey, idxVal []byte) error { + if w.currDupCnt == 1 { + // last pivot has no duplicate. + if err := w.doWriteRow(ctx, w.pivotKey, w.pivotValue); err != nil { + return err + } + } + if idxKey != nil { + w.pivotKey = slices.Clone(idxKey) + w.pivotValue = slices.Clone(idxVal) + w.currDupCnt = 1 + } else { + w.pivotKey, w.pivotValue = nil, nil + w.currDupCnt = 0 + } + return nil +} + +func (w *OneFileWriter) handlePivotOnClose(ctx context.Context) error { + return w.onNextPivot(ctx, nil, nil) +} + +func (w *OneFileWriter) doWriteRow(ctx context.Context, idxKey, idxVal []byte) error { if w.minKey == nil { w.minKey = slices.Clone(idxKey) } + if err := w.lazyInitWriter(ctx); err != nil { + return err + } // 1. encode data and write to kvStore. keyLen := len(idxKey) length := len(idxKey) + len(idxVal) + lengthBytes*2 @@ -128,11 +233,8 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err // the new prop should have the same offset with kvStore. w.rc.currProp.offset = w.kvStore.offset } - binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen)) - binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal))) - copy(buf[lengthBytes*2:], idxKey) + encodeToBuf(buf, idxKey, idxVal) w.maxKey = buf[lengthBytes*2 : lengthBytes*2+keyLen] - copy(buf[lengthBytes*2+keyLen:], idxVal) err := w.kvStore.addEncodedData(buf[:length]) if err != nil { return err @@ -151,22 +253,33 @@ func (w *OneFileWriter) Close(ctx context.Context) error { if err != nil { return err } - w.logger.Info("close one file writer", - zap.String("writerID", w.writerID)) + w.logger.Info("close one file writer", zap.String("writerID", w.writerID)) - maxKey := slices.Clone(w.maxKey) - var stat MultipleFilesStat - stat.Filenames = append(stat.Filenames, - [2]string{w.dataFile, w.statFile}) - stat.build([]tidbkv.Key{w.minKey}, []tidbkv.Key{maxKey}) + var minKey, maxKey []byte + mStats := make([]MultipleFilesStat, 0, 1) + if w.totalCnt > 0 { + // it's possible that all KV pairs are duplicates and removed. + minKey = w.minKey + maxKey = slices.Clone(w.maxKey) + var stat MultipleFilesStat + stat.Filenames = append(stat.Filenames, [2]string{w.dataFile, w.statFile}) + stat.build([]tidbkv.Key{w.minKey}, []tidbkv.Key{maxKey}) + mStats = append(mStats, stat) + } + conflictInfo := engineapi.ConflictInfo{} + if w.recordedDupCnt > 0 { + conflictInfo.Count = uint64(w.recordedDupCnt) + conflictInfo.Files = []string{w.dupFile} + } w.onClose(&WriterSummary{ WriterID: w.writerID, Seq: 0, - Min: w.minKey, + Min: minKey, Max: maxKey, TotalSize: w.totalSize, TotalCnt: w.totalCnt, - MultipleFilesStats: []MultipleFilesStat{stat}, + MultipleFilesStats: mStats, + ConflictInfo: conflictInfo, }) w.totalCnt = 0 w.totalSize = 0 @@ -175,27 +288,50 @@ func (w *OneFileWriter) Close(ctx context.Context) error { } func (w *OneFileWriter) closeImpl(ctx context.Context) (err error) { - // 1. write remaining statistic. - w.kvStore.finish() - encodedStat := w.rc.encode() - _, err = w.statWriter.Write(ctx, encodedStat) - if err != nil { - return err - } - w.rc.reset() - // 2. close data writer. - err1 := w.dataWriter.Close(ctx) - if err1 != nil { - err = err1 - w.logger.Error("Close data writer failed", zap.Error(err)) + if err = w.handlePivotOnClose(ctx); err != nil { return } - // 3. close stat writer. - err2 := w.statWriter.Close(ctx) - if err2 != nil { - err = err2 - w.logger.Error("Close stat writer failed", zap.Error(err)) - return + if w.dataWriter != nil { + // 1. write remaining statistic. + w.kvStore.finish() + encodedStat := w.rc.encode() + _, err = w.statWriter.Write(ctx, encodedStat) + if err != nil { + return err + } + w.rc.reset() + // 2. close data writer. + err1 := w.dataWriter.Close(ctx) + if err1 != nil { + err = err1 + w.logger.Error("Close data writer failed", zap.Error(err)) + return + } + // 3. close stat writer. + err2 := w.statWriter.Close(ctx) + if err2 != nil { + err = err2 + w.logger.Error("Close stat writer failed", zap.Error(err)) + return + } + } + if w.dupWriter != nil { + w.dupKVStore.finish() + if err3 := w.dupWriter.Close(ctx); err3 != nil { + err = err3 + w.logger.Error("Close dup writer failed", zap.Error(err)) + return + } } return nil } + +// caller should make sure the buf is large enough to hold the encoded data. +func encodeToBuf(buf, key, value []byte) { + intest.Assert(len(buf) == lengthBytes*2+len(key)+len(value)) + keyLen := len(key) + binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen)) + binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(value))) + copy(buf[lengthBytes*2:], key) + copy(buf[lengthBytes*2+keyLen:], value) +} diff --git a/pkg/lightning/backend/external/onefile_writer_test.go b/pkg/lightning/backend/external/onefile_writer_test.go index 97780b032a..9c996f3851 100644 --- a/pkg/lightning/backend/external/onefile_writer_test.go +++ b/pkg/lightning/backend/external/onefile_writer_test.go @@ -51,7 +51,7 @@ func TestOnefileWriterBasic(t *testing.T) { SetPropKeysDistance(2). BuildOneFile(memStore, "/test", "0") - require.NoError(t, writer.Init(ctx, 5*1024*1024)) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvCnt := 100 kvs := make([]common.KvPair, kvCnt) @@ -122,7 +122,7 @@ func checkOneFileWriterStatWithDistance(t *testing.T, kvCnt int, keysDistance ui SetPropKeysDistance(keysDistance). BuildOneFile(memStore, "/"+prefix, "0") - require.NoError(t, writer.Init(ctx, 5*1024*1024)) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvs := make([]common.KvPair, 0, kvCnt) for i := 0; i < kvCnt; i++ { kvs = append(kvs, common.KvPair{ @@ -267,7 +267,7 @@ func TestOnefileWriterManyRows(t *testing.T) { SetMemorySizeLimit(1000). BuildOneFile(memStore, "/test", "0") - require.NoError(t, writer.Init(ctx, 5*1024*1024)) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvCnt := 100000 expectedTotalSize := 0 @@ -366,7 +366,7 @@ func TestOnefilePropOffset(t *testing.T) { SetMemorySizeLimit(uint64(memSizeLimit)). BuildOneFile(memStore, "/test", "0") - require.NoError(t, writer.Init(ctx, 5*1024*1024)) + writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvCnt := 10000 kvs := make([]common.KvPair, kvCnt) @@ -399,3 +399,21 @@ func TestOnefilePropOffset(t *testing.T) { lastOffset = prop.offset } } + +type testOneFileWriter struct { + *OneFileWriter +} + +func (w *testOneFileWriter) WriteRow(ctx context.Context, key, val []byte, _ dbkv.Handle) error { + return w.OneFileWriter.WriteRow(ctx, key, val) +} + +func TestOnefileWriterOnDup(t *testing.T) { + getWriterFn := func(store storage.ExternalStorage, b *WriterBuilder) testWriter { + writer := b.BuildOneFile(store, "/onefile", "0") + writer.InitPartSizeAndLogger(context.Background(), 1024) + return &testOneFileWriter{OneFileWriter: writer} + } + doTestWriterOnDupRecord(t, true, getWriterFn) + doTestWriterOnDupRemove(t, true, getWriterFn) +} diff --git a/pkg/lightning/backend/external/reader_test.go b/pkg/lightning/backend/external/reader_test.go index dbac2187e3..e40549ecfc 100644 --- a/pkg/lightning/backend/external/reader_test.go +++ b/pkg/lightning/backend/external/reader_test.go @@ -85,7 +85,7 @@ func TestReadAllOneFile(t *testing.T) { SetMemorySizeLimit(uint64(memSizeLimit)). BuildOneFile(memStore, "/test", "0") - require.NoError(t, w.Init(ctx, int64(5*size.MB))) + w.InitPartSizeAndLogger(ctx, int64(5*size.MB)) kvCnt := rand.Intn(10) + 10000 kvs := make([]common.KvPair, kvCnt) @@ -123,7 +123,7 @@ func TestReadLargeFile(t *testing.T) { SetPropKeysDistance(1000). BuildOneFile(memStore, "/test", "0") - require.NoError(t, w.Init(ctx, int64(5*size.MB))) + w.InitPartSizeAndLogger(ctx, int64(5*size.MB)) val := make([]byte, 10000) for i := 0; i < 10000; i++ { diff --git a/pkg/lightning/backend/external/split_test.go b/pkg/lightning/backend/external/split_test.go index b2cceaf932..d1b872e50d 100644 --- a/pkg/lightning/backend/external/split_test.go +++ b/pkg/lightning/backend/external/split_test.go @@ -414,10 +414,9 @@ func Test3KFilesRangeSplitter(t *testing.T) { SetPropSizeDistance(size.MB). SetOnCloseFunc(onClose). BuildOneFile(store, "/mock-test", uuid.New().String()) - err := w.Init(ctx, int64(5*size.MB)) - require.NoError(t, err) + w.InitPartSizeAndLogger(ctx, int64(5*size.MB)) // we don't need data files - err = w.dataWriter.Close(ctx) + err := w.dataWriter.Close(ctx) require.NoError(t, err) w.dataWriter = storage.NoopWriter{} diff --git a/pkg/lightning/backend/external/writer_test.go b/pkg/lightning/backend/external/writer_test.go index 1f22b5e5e3..98e67318fc 100644 --- a/pkg/lightning/backend/external/writer_test.go +++ b/pkg/lightning/backend/external/writer_test.go @@ -22,6 +22,7 @@ import ( "io" "path" "slices" + "sort" "strconv" "strings" "testing" @@ -606,7 +607,7 @@ func TestGetAdjustedIndexBlockSize(t *testing.T) { require.EqualValues(t, 16*units.MiB, GetAdjustedBlockSize(166*units.MiB)) } -func readKVFile(t *testing.T, store *writerFirstCloseFailStorage, filename string) []kvPair { +func readKVFile(t *testing.T, store storage.ExternalStorage, filename string) []kvPair { t.Helper() reader, err := newKVReader(context.Background(), filename, store, 0, units.KiB) require.NoError(t, err) @@ -622,15 +623,44 @@ func readKVFile(t *testing.T, store *writerFirstCloseFailStorage, filename strin return kvs } -func TestWriterOnDupRecord(t *testing.T) { - ctx := context.Background() - store := &writerFirstCloseFailStorage{ExternalStorage: storage.NewMemStorage(), shouldFail: true} - var summary *WriterSummary +type testWriter interface { + WriteRow(ctx context.Context, key, val []byte, handle dbkv.Handle) error + Close(ctx context.Context) error +} - t.Run("all duplicated, flush once", func(t *testing.T) { - writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240). - SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRecord). - Build(store, "/test", "0") +func TestWriterOnDup(t *testing.T) { + getWriterFn := func(store storage.ExternalStorage, b *WriterBuilder) testWriter { + return b.Build(store, "/test", "0") + } + doTestWriterOnDupRecord(t, false, getWriterFn) + doTestWriterOnDupRemove(t, false, getWriterFn) +} + +func doTestWriterOnDupRecord(t *testing.T, testingOneFile bool, getWriter func(store storage.ExternalStorage, b *WriterBuilder) testWriter) { + t.Helper() + ctx := context.Background() + store := storage.NewMemStorage() + var summary *WriterSummary + doGetWriter := func(store storage.ExternalStorage, builder *WriterBuilder) testWriter { + builder = builder.SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRecord) + return getWriter(store, builder) + } + + t.Run("write nothing", func(t *testing.T) { + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) + require.NoError(t, writer.Close(ctx)) + require.Empty(t, summary.Min) + require.Empty(t, summary.Max) + require.Zero(t, summary.TotalCnt) + require.Zero(t, summary.TotalSize) + require.Zero(t, summary.ConflictInfo.Count) + require.Empty(t, summary.ConflictInfo.Files) + }) + + t.Run("all duplicated", func(t *testing.T) { + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) for range 5 { require.NoError(t, writer.WriteRow(ctx, []byte("1111"), []byte("vvvv"), nil)) } @@ -649,12 +679,50 @@ func TestWriterOnDupRecord(t *testing.T) { } }) - t.Run("with different duplicated kv, flush twice", func(t *testing.T) { + t.Run("with different duplicated kv, first kv not duplicated", func(t *testing.T) { // each KV will take 24 bytes, so we flush every 10 KVs - writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240). - SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRecord). - Build(store, "/test", "0") - for _, p := range []struct { + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) + input := []struct { + pair *kvPair + cnt int + }{ + {pair: &kvPair{key: []byte("2222"), value: []byte("vvvv")}, cnt: 1}, + {pair: &kvPair{key: []byte("1111"), value: []byte("vvvv")}, cnt: 1}, + {pair: &kvPair{key: []byte("6666"), value: []byte("vvvv")}, cnt: 3}, + {pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 5}, + } + if testingOneFile { + sort.Slice(input, func(i, j int) bool { + return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0 + }) + } + for _, p := range input { + for i := 0; i < p.cnt; i++ { + require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil)) + } + } + require.NoError(t, writer.Close(ctx)) + require.EqualValues(t, []byte("1111"), summary.Min) + require.EqualValues(t, []byte("7777"), summary.Max) + require.EqualValues(t, 6, summary.TotalCnt) + require.EqualValues(t, 48, summary.TotalSize) + require.EqualValues(t, 4, summary.ConflictInfo.Count) + require.Len(t, summary.ConflictInfo.Files, 1) + kvs := readKVFile(t, store, summary.ConflictInfo.Files[0]) + require.EqualValues(t, []kvPair{ + {key: []byte("6666"), value: []byte("vvvv")}, + {key: []byte("7777"), value: []byte("vvvv")}, + {key: []byte("7777"), value: []byte("vvvv")}, + {key: []byte("7777"), value: []byte("vvvv")}, + }, kvs) + }) + + t.Run("with different duplicated kv, first kv duplicated", func(t *testing.T) { + // each KV will take 24 bytes, so we flush every 10 KVs + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) + input := []struct { pair *kvPair cnt int }{ @@ -665,7 +733,13 @@ func TestWriterOnDupRecord(t *testing.T) { {pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 1}, {pair: &kvPair{key: []byte("4444"), value: []byte("vvvv")}, cnt: 4}, {pair: &kvPair{key: []byte("3333"), value: []byte("vvvv")}, cnt: 4}, - } { + } + if testingOneFile { + sort.Slice(input, func(i, j int) bool { + return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0 + }) + } + for _, p := range input { for i := 0; i < p.cnt; i++ { require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil)) } @@ -676,33 +750,64 @@ func TestWriterOnDupRecord(t *testing.T) { require.EqualValues(t, 12, summary.TotalCnt) require.EqualValues(t, 96, summary.TotalSize) require.EqualValues(t, 8, summary.ConflictInfo.Count) - require.Len(t, summary.ConflictInfo.Files, 2) - kvs := readKVFile(t, store, summary.ConflictInfo.Files[0]) - require.EqualValues(t, []kvPair{ - {key: []byte("1111"), value: []byte("vvvv")}, - {key: []byte("1111"), value: []byte("vvvv")}, - {key: []byte("1111"), value: []byte("vvvv")}, - {key: []byte("2222"), value: []byte("vvvv")}, - }, kvs) - kvs = readKVFile(t, store, summary.ConflictInfo.Files[1]) - require.EqualValues(t, []kvPair{ - {key: []byte("3333"), value: []byte("vvvv")}, - {key: []byte("3333"), value: []byte("vvvv")}, - {key: []byte("4444"), value: []byte("vvvv")}, - {key: []byte("4444"), value: []byte("vvvv")}, - }, kvs) + if testingOneFile { + require.Len(t, summary.ConflictInfo.Files, 1) + kvs := readKVFile(t, store, summary.ConflictInfo.Files[0]) + require.EqualValues(t, []kvPair{ + {key: []byte("1111"), value: []byte("vvvv")}, + {key: []byte("1111"), value: []byte("vvvv")}, + {key: []byte("1111"), value: []byte("vvvv")}, + {key: []byte("2222"), value: []byte("vvvv")}, + {key: []byte("3333"), value: []byte("vvvv")}, + {key: []byte("3333"), value: []byte("vvvv")}, + {key: []byte("4444"), value: []byte("vvvv")}, + {key: []byte("4444"), value: []byte("vvvv")}, + }, kvs) + } else { + require.Len(t, summary.ConflictInfo.Files, 2) + kvs := readKVFile(t, store, summary.ConflictInfo.Files[0]) + require.EqualValues(t, []kvPair{ + {key: []byte("1111"), value: []byte("vvvv")}, + {key: []byte("1111"), value: []byte("vvvv")}, + {key: []byte("1111"), value: []byte("vvvv")}, + {key: []byte("2222"), value: []byte("vvvv")}, + }, kvs) + kvs = readKVFile(t, store, summary.ConflictInfo.Files[1]) + require.EqualValues(t, []kvPair{ + {key: []byte("3333"), value: []byte("vvvv")}, + {key: []byte("3333"), value: []byte("vvvv")}, + {key: []byte("4444"), value: []byte("vvvv")}, + {key: []byte("4444"), value: []byte("vvvv")}, + }, kvs) + } }) } -func TestWriterOnDupRemove(t *testing.T) { +func doTestWriterOnDupRemove(t *testing.T, testingOneFile bool, getWriter func(storage.ExternalStorage, *WriterBuilder) testWriter) { + t.Helper() ctx := context.Background() - store := &writerFirstCloseFailStorage{ExternalStorage: storage.NewMemStorage(), shouldFail: true} + store := storage.NewMemStorage() var summary *WriterSummary + doGetWriter := func(store storage.ExternalStorage, builder *WriterBuilder) testWriter { + builder = builder.SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRemove) + return getWriter(store, builder) + } - t.Run("all duplicated, flush once", func(t *testing.T) { - writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240). - SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRemove). - Build(store, "/test", "0") + t.Run("write nothing", func(t *testing.T) { + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) + require.NoError(t, writer.Close(ctx)) + require.Empty(t, summary.Min) + require.Empty(t, summary.Max) + require.Zero(t, summary.TotalCnt) + require.Zero(t, summary.TotalSize) + require.Zero(t, summary.ConflictInfo.Count) + require.Empty(t, summary.ConflictInfo.Files) + }) + + t.Run("all duplicated", func(t *testing.T) { + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) for range 5 { require.NoError(t, writer.WriteRow(ctx, []byte("1111"), []byte("vvvv"), nil)) } @@ -716,12 +821,43 @@ func TestWriterOnDupRemove(t *testing.T) { require.Empty(t, summary.ConflictInfo.Files) }) - t.Run("with different duplicated kv, flush twice", func(t *testing.T) { + t.Run("with different duplicated kv, first kv not duplicated", func(t *testing.T) { // each KV will take 24 bytes, so we flush every 10 KVs - writer := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240). - SetOnCloseFunc(func(s *WriterSummary) { summary = s }).SetOnDup(engineapi.OnDuplicateKeyRemove). - Build(store, "/test", "0") - for _, p := range []struct { + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) + input := []struct { + pair *kvPair + cnt int + }{ + {pair: &kvPair{key: []byte("2222"), value: []byte("vvvv")}, cnt: 1}, + {pair: &kvPair{key: []byte("1111"), value: []byte("vvvv")}, cnt: 1}, + {pair: &kvPair{key: []byte("6666"), value: []byte("vvvv")}, cnt: 3}, + {pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 5}, + } + if testingOneFile { + sort.Slice(input, func(i, j int) bool { + return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0 + }) + } + for _, p := range input { + for i := 0; i < p.cnt; i++ { + require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil)) + } + } + require.NoError(t, writer.Close(ctx)) + require.EqualValues(t, []byte("1111"), summary.Min) + require.EqualValues(t, []byte("2222"), summary.Max) + require.EqualValues(t, 2, summary.TotalCnt) + require.EqualValues(t, 16, summary.TotalSize) + require.EqualValues(t, 0, summary.ConflictInfo.Count) + require.Empty(t, summary.ConflictInfo.Files) + }) + + t.Run("with different duplicated kv, first kv duplicated", func(t *testing.T) { + // each KV will take 24 bytes, so we flush every 10 KVs + builder := NewWriterBuilder().SetPropKeysDistance(4).SetMemorySizeLimit(240).SetBlockSize(240) + writer := doGetWriter(store, builder) + input := []struct { pair *kvPair cnt int }{ @@ -732,7 +868,13 @@ func TestWriterOnDupRemove(t *testing.T) { {pair: &kvPair{key: []byte("7777"), value: []byte("vvvv")}, cnt: 1}, {pair: &kvPair{key: []byte("4444"), value: []byte("vvvv")}, cnt: 4}, {pair: &kvPair{key: []byte("3333"), value: []byte("vvvv")}, cnt: 4}, - } { + } + if testingOneFile { + sort.Slice(input, func(i, j int) bool { + return bytes.Compare(input[i].pair.key, input[j].pair.key) < 0 + }) + } + for _, p := range input { for i := 0; i < p.cnt; i++ { require.NoError(t, writer.WriteRow(ctx, p.pair.key, p.pair.value, nil)) }