1416 lines
35 KiB
Go
1416 lines
35 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package extsort
|
|
|
|
import (
|
|
"bytes"
|
|
"container/heap"
|
|
"context"
|
|
"encoding/json"
|
|
goerrors "errors"
|
|
"fmt"
|
|
"io"
|
|
"runtime"
|
|
"slices"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/cockroachdb/pebble"
|
|
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
|
|
"github.com/cockroachdb/pebble/sstable"
|
|
"github.com/cockroachdb/pebble/vfs"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/pkg/util/generic"
|
|
"github.com/pingcap/tidb/pkg/util/syncutil"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
const (
|
|
sstFileSuffix = ".sst"
|
|
tmpFileSuffix = ".tmp"
|
|
kvStatsPropKey = "extsort.kvstats"
|
|
defaultKVStatsBucketSize = 1 << 20
|
|
|
|
// If the file exists, it means the disk sorter is sorted.
|
|
diskSorterSortedFile = "sorted"
|
|
diskSorterStateWriting = 0
|
|
diskSorterStateSorting = 1
|
|
diskSorterStateSorted = 2
|
|
)
|
|
|
|
type fileMetadata struct {
|
|
fileNum int
|
|
startKey []byte // inclusive
|
|
endKey []byte // exclusive
|
|
lastKey []byte // the last key in the file
|
|
kvStats kvStats
|
|
}
|
|
|
|
type kvStats struct {
|
|
Histogram []kvStatsBucket `json:"histogram"`
|
|
}
|
|
|
|
type kvStatsBucket struct {
|
|
// Size is the kv size between the upper bound of previous bucket
|
|
// (exclusive) and the upper bound of current bucket (inclusive).
|
|
Size int `json:"size"`
|
|
// UpperBound is the upper inclusive bound of the bucket.
|
|
UpperBound []byte `json:"upperBound"`
|
|
}
|
|
|
|
// kvStatsCollector implements sstable.TablePropertyCollector.
|
|
// It collects kv stats of the sstables.
|
|
type kvStatsCollector struct {
|
|
bucketSize int
|
|
buckets []kvStatsBucket
|
|
curSize int
|
|
lastKey []byte
|
|
}
|
|
|
|
func newKVStatsCollector(bucketSize int) *kvStatsCollector {
|
|
return &kvStatsCollector{
|
|
bucketSize: bucketSize,
|
|
}
|
|
}
|
|
|
|
// Add is called when a key-value pair is added to the sstable.
|
|
// In our case, it's guaranteed that the key-value pairs are added in order,
|
|
// and key.Trailer must be sstable.InternalKeyKindSet.
|
|
func (c *kvStatsCollector) Add(key sstable.InternalKey, value []byte) error {
|
|
c.curSize += len(key.UserKey) + len(value)
|
|
c.lastKey = append(c.lastKey[:0], key.UserKey...)
|
|
if c.curSize >= c.bucketSize {
|
|
c.addBucket()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *kvStatsCollector) addBucket() {
|
|
c.buckets = append(c.buckets, kvStatsBucket{
|
|
Size: c.curSize,
|
|
UpperBound: slices.Clone(c.lastKey),
|
|
})
|
|
c.curSize = 0
|
|
}
|
|
|
|
func (c *kvStatsCollector) Finish(userProps map[string]string) error {
|
|
if c.curSize > 0 {
|
|
c.addBucket()
|
|
}
|
|
stats := kvStats{Histogram: c.buckets}
|
|
data, err := json.Marshal(&stats)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
userProps[kvStatsPropKey] = string(data)
|
|
return nil
|
|
}
|
|
|
|
func (*kvStatsCollector) Name() string {
|
|
return kvStatsPropKey
|
|
}
|
|
|
|
func makeFilename(fs vfs.FS, dirname string, fileNum int) string {
|
|
return fs.PathJoin(dirname, fmt.Sprintf("%06d%s", fileNum, sstFileSuffix))
|
|
}
|
|
|
|
func parseFilename(fs vfs.FS, filename string) (fileNum int, ok bool) {
|
|
filename = fs.PathBase(filename)
|
|
if !strings.HasSuffix(filename, sstFileSuffix) {
|
|
return 0, false
|
|
}
|
|
v, err := strconv.ParseInt(filename[:len(filename)-len(sstFileSuffix)], 10, 64)
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
return int(v), true
|
|
}
|
|
|
|
type sstWriter struct {
|
|
w *sstable.Writer
|
|
fs vfs.FS
|
|
fileNum int
|
|
tmpPath string
|
|
destPath string
|
|
err error
|
|
onSuccess func(meta *fileMetadata)
|
|
}
|
|
|
|
func newSSTWriter(
|
|
fs vfs.FS,
|
|
dirname string,
|
|
fileNum int,
|
|
kvStatsBucketSize int,
|
|
onSuccess func(meta *fileMetadata),
|
|
) (*sstWriter, error) {
|
|
destPath := makeFilename(fs, dirname, fileNum)
|
|
tmpPath := destPath + tmpFileSuffix
|
|
f, err := vfs.Default.Create(tmpPath)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
writable := objstorageprovider.NewFileWritable(f)
|
|
w := sstable.NewWriter(writable, sstable.WriterOptions{
|
|
TablePropertyCollectors: []func() sstable.TablePropertyCollector{
|
|
func() sstable.TablePropertyCollector {
|
|
return newKVStatsCollector(kvStatsBucketSize)
|
|
},
|
|
},
|
|
})
|
|
|
|
sw := &sstWriter{
|
|
w: w,
|
|
fs: fs,
|
|
fileNum: fileNum,
|
|
tmpPath: tmpPath,
|
|
destPath: destPath,
|
|
onSuccess: onSuccess,
|
|
}
|
|
return sw, nil
|
|
}
|
|
|
|
func (sw *sstWriter) Set(key, value []byte) error {
|
|
sw.err = sw.w.Set(key, value)
|
|
return errors.Trace(sw.err)
|
|
}
|
|
|
|
func (sw *sstWriter) Close() (retErr error) {
|
|
defer func() {
|
|
if retErr == nil {
|
|
retErr = sw.fs.Rename(sw.tmpPath, sw.destPath)
|
|
} else {
|
|
_ = sw.fs.Remove(sw.tmpPath)
|
|
}
|
|
}()
|
|
closeErr := sw.w.Close()
|
|
if err := goerrors.Join(sw.err, closeErr); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
if sw.onSuccess != nil {
|
|
writerMeta, err := sw.w.Metadata()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
meta := &fileMetadata{
|
|
fileNum: sw.fileNum,
|
|
}
|
|
meta.startKey = slices.Clone(writerMeta.SmallestPoint.UserKey)
|
|
meta.lastKey = slices.Clone(writerMeta.LargestPoint.UserKey)
|
|
// Make endKey is exclusive. To avoid unnecessary overlap,
|
|
// we append 0 to make endKey is the smallest key which is
|
|
// greater than the last key.
|
|
meta.endKey = append(meta.lastKey, 0)
|
|
prop, ok := writerMeta.Properties.UserProperties[kvStatsPropKey]
|
|
if ok {
|
|
if err := json.Unmarshal([]byte(prop), &meta.kvStats); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
sw.onSuccess(meta)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type sstReaderPool struct {
|
|
fs vfs.FS
|
|
dirname string
|
|
cache *pebble.Cache
|
|
mu struct {
|
|
syncutil.RWMutex
|
|
readers map[int]struct {
|
|
reader *sstable.Reader
|
|
refs *atomic.Int32
|
|
} // fileNum -> reader and its refs
|
|
}
|
|
}
|
|
|
|
func newSSTReaderPool(fs vfs.FS, dirname string, cache *pebble.Cache) *sstReaderPool {
|
|
pool := &sstReaderPool{
|
|
fs: fs,
|
|
dirname: dirname,
|
|
cache: cache,
|
|
}
|
|
pool.mu.readers = make(map[int]struct {
|
|
reader *sstable.Reader
|
|
refs *atomic.Int32
|
|
})
|
|
return pool
|
|
}
|
|
|
|
func (p *sstReaderPool) get(fileNum int) (*sstable.Reader, error) {
|
|
var reader *sstable.Reader
|
|
withLock(p.mu.RLocker(), func() {
|
|
if v, ok := p.mu.readers[fileNum]; ok {
|
|
v.refs.Add(1)
|
|
reader = v.reader
|
|
}
|
|
})
|
|
if reader != nil {
|
|
return reader, nil
|
|
}
|
|
|
|
// Create a new reader. Note that we don't hold the lock here,
|
|
// since creating a reader can be expensive, and we don't want
|
|
// to block other readers' creation.
|
|
f, err := p.fs.Open(makeFilename(p.fs, p.dirname, fileNum))
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
readable, err := sstable.NewSimpleReadable(f)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
reader, err = sstable.NewReader(readable, sstable.ReaderOptions{
|
|
Cache: p.cache,
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
var toRet, toClose *sstable.Reader
|
|
withLock(&p.mu, func() {
|
|
// Someone might have added the reader to the pool.
|
|
if v, ok := p.mu.readers[fileNum]; ok {
|
|
toClose = reader
|
|
v.refs.Add(1)
|
|
toRet = v.reader
|
|
return
|
|
}
|
|
refs := new(atomic.Int32)
|
|
refs.Store(1)
|
|
p.mu.readers[fileNum] = struct {
|
|
reader *sstable.Reader
|
|
refs *atomic.Int32
|
|
}{reader: reader, refs: refs}
|
|
toRet = reader
|
|
})
|
|
if toClose != nil {
|
|
if err := toClose.Close(); err != nil {
|
|
_ = p.unref(fileNum)
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
return toRet, nil
|
|
}
|
|
|
|
func (p *sstReaderPool) unref(fileNum int) error {
|
|
var closer io.Closer
|
|
withLock(&p.mu, func() {
|
|
v, ok := p.mu.readers[fileNum]
|
|
if !ok {
|
|
panic(fmt.Sprintf("sstReaderPool: unref a reader that does not exist: %d", fileNum))
|
|
}
|
|
if v.refs.Add(-1) == 0 {
|
|
delete(p.mu.readers, fileNum)
|
|
closer = v.reader
|
|
}
|
|
})
|
|
if closer != nil {
|
|
return errors.Trace(closer.Close())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func withLock(l sync.Locker, f func()) {
|
|
l.Lock()
|
|
defer l.Unlock()
|
|
f()
|
|
}
|
|
|
|
type sstIter struct {
|
|
iter sstable.Iterator
|
|
key *sstable.InternalKey
|
|
value []byte
|
|
onClose func() error
|
|
}
|
|
|
|
func (si *sstIter) Seek(key []byte) bool {
|
|
k, v := si.iter.SeekGE(key, 0)
|
|
si.key = k
|
|
si.value, _, _ = v.Value(nil)
|
|
return si.key != nil
|
|
}
|
|
|
|
func (si *sstIter) First() bool {
|
|
k, v := si.iter.SeekGE(nil, 0)
|
|
si.key = k
|
|
si.value, _, _ = v.Value(nil)
|
|
return si.key != nil
|
|
}
|
|
|
|
func (si *sstIter) Next() bool {
|
|
k, v := si.iter.Next()
|
|
si.key = k
|
|
si.value, _, _ = v.Value(nil)
|
|
return si.key != nil
|
|
}
|
|
|
|
func (si *sstIter) Last() bool {
|
|
k, v := si.iter.Last()
|
|
si.key = k
|
|
si.value, _, _ = v.Value(nil)
|
|
return si.key != nil
|
|
}
|
|
|
|
func (si *sstIter) Valid() bool {
|
|
return si.key != nil
|
|
}
|
|
|
|
func (si *sstIter) Error() error {
|
|
return errors.Trace(si.iter.Error())
|
|
}
|
|
|
|
func (si *sstIter) UnsafeKey() []byte {
|
|
return si.key.UserKey
|
|
}
|
|
|
|
func (si *sstIter) UnsafeValue() []byte {
|
|
return si.value
|
|
}
|
|
|
|
func (si *sstIter) Close() error {
|
|
err := si.iter.Close()
|
|
if si.onClose != nil {
|
|
err = goerrors.Join(err, si.onClose())
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
type mergingIter struct {
|
|
heap mergingIterHeap
|
|
// orderedFiles is the files to be merged. It is ordered by start key.
|
|
orderedFiles []*fileMetadata
|
|
// nextFileIndex is the index of the next file to be read.
|
|
// mergingIter only maintains minimum set of opened files.
|
|
// If current key is less than the start key of next file,
|
|
// the next file does not need to be opened.
|
|
nextFileIndex int
|
|
openIter openIterFunc
|
|
curKey []byte // only used in Next() to avoid alloc
|
|
err error
|
|
}
|
|
|
|
type mergingIterItem struct {
|
|
Iterator
|
|
index int // index in orderedFiles
|
|
}
|
|
|
|
type mergingIterHeap []mergingIterItem
|
|
|
|
func (h mergingIterHeap) Len() int {
|
|
return len(h)
|
|
}
|
|
|
|
func (h mergingIterHeap) Swap(i, j int) {
|
|
h[i], h[j] = h[j], h[i]
|
|
}
|
|
|
|
func (h mergingIterHeap) Less(i, j int) bool {
|
|
return bytes.Compare(h[i].UnsafeKey(), h[j].UnsafeKey()) < 0
|
|
}
|
|
|
|
func (h *mergingIterHeap) Push(x any) {
|
|
*h = append(*h, x.(mergingIterItem))
|
|
}
|
|
|
|
func (h *mergingIterHeap) Pop() any {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[:n-1]
|
|
return x
|
|
}
|
|
|
|
type openIterFunc func(file *fileMetadata) (Iterator, error)
|
|
|
|
// newMergingIter returns an iterator that merges the given files.
|
|
// orderedFiles must be ordered by start key, otherwise it panics.
|
|
func newMergingIter(orderedFiles []*fileMetadata, openIter openIterFunc) *mergingIter {
|
|
if !slices.IsSortedFunc(orderedFiles, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(a.startKey, b.startKey)
|
|
}) {
|
|
panic("newMergingIter: orderedFiles are not ordered by start key")
|
|
}
|
|
return &mergingIter{
|
|
orderedFiles: orderedFiles,
|
|
openIter: openIter,
|
|
}
|
|
}
|
|
|
|
func (m *mergingIter) Seek(key []byte) bool {
|
|
m.err = nil
|
|
oldHeap := m.heap
|
|
m.heap = nil
|
|
|
|
var toClose []mergingIterItem
|
|
openedIters := make(map[int]struct{})
|
|
for i, item := range oldHeap {
|
|
file := m.orderedFiles[item.index]
|
|
if bytes.Compare(key, file.startKey) >= 0 &&
|
|
bytes.Compare(key, file.endKey) < 0 &&
|
|
item.Seek(key) {
|
|
openedIters[item.index] = struct{}{}
|
|
heap.Push(&m.heap, item)
|
|
} else {
|
|
m.err = item.Error()
|
|
if m.err != nil {
|
|
// Iterators in m.heap should be closed by mergingIter.Close().
|
|
toClose = append(toClose, oldHeap[i:]...)
|
|
break
|
|
}
|
|
// The seek key is not in the range of [startKey, endKey), close it.
|
|
toClose = append(toClose, item)
|
|
}
|
|
}
|
|
for _, item := range toClose {
|
|
if err := item.Close(); err != nil && m.err == nil {
|
|
m.err = err
|
|
}
|
|
}
|
|
if m.err != nil {
|
|
return false
|
|
}
|
|
|
|
m.nextFileIndex = len(m.orderedFiles)
|
|
for i, file := range m.orderedFiles {
|
|
if bytes.Compare(file.startKey, key) > 0 {
|
|
m.nextFileIndex = i
|
|
break
|
|
}
|
|
if _, ok := openedIters[i]; ok {
|
|
continue
|
|
}
|
|
if bytes.Compare(file.endKey, key) <= 0 {
|
|
continue
|
|
}
|
|
iter, err := m.openIter(file)
|
|
if err != nil {
|
|
m.err = err
|
|
return false
|
|
}
|
|
if iter.Seek(key) {
|
|
heap.Push(&m.heap, mergingIterItem{
|
|
Iterator: iter,
|
|
index: i,
|
|
})
|
|
} else {
|
|
m.err = iter.Error()
|
|
closeErr := iter.Close()
|
|
m.err = goerrors.Join(m.err, closeErr)
|
|
if m.err != nil {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
// Although we have opened all files whose range contains the seek key,
|
|
// it is possible the seeked key is greater than the original seek key.
|
|
// So we need to check if the next file needs to be opened.
|
|
//
|
|
// Consider the following case:
|
|
//
|
|
// file 1: a--------f
|
|
// file 2: b------f
|
|
// file 3: d--f
|
|
//
|
|
// If we seek to "c", we will open file 1 and file 2 first. However, the
|
|
// smallest key which is greater than "c" is "e", so we need to open file 3
|
|
// to check if it has keys less than "e".
|
|
return m.maybeOpenNextFiles()
|
|
}
|
|
|
|
func (m *mergingIter) First() bool {
|
|
return m.Seek(nil)
|
|
}
|
|
|
|
func (m *mergingIter) Next() bool {
|
|
m.err = nil
|
|
m.curKey = append(m.curKey[:0], m.heap[0].UnsafeKey()...)
|
|
//revive:disable:empty-block
|
|
for m.next() && bytes.Equal(m.heap[0].UnsafeKey(), m.curKey) {
|
|
}
|
|
//revive:enable:empty-block
|
|
return m.Valid()
|
|
}
|
|
|
|
func (m *mergingIter) next() bool {
|
|
if len(m.heap) == 0 {
|
|
return false
|
|
}
|
|
if m.heap[0].Next() {
|
|
heap.Fix(&m.heap, 0)
|
|
} else {
|
|
m.err = m.heap[0].Error()
|
|
if m.err != nil {
|
|
return false
|
|
}
|
|
x := heap.Pop(&m.heap)
|
|
item := x.(mergingIterItem)
|
|
m.err = item.Close()
|
|
if m.err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return m.maybeOpenNextFiles()
|
|
}
|
|
|
|
func (m *mergingIter) maybeOpenNextFiles() bool {
|
|
for m.nextFileIndex < len(m.orderedFiles) {
|
|
file := m.orderedFiles[m.nextFileIndex]
|
|
if len(m.heap) > 0 && bytes.Compare(m.heap[0].UnsafeKey(), file.startKey) < 0 {
|
|
break
|
|
}
|
|
// The next file may overlap with the min key in the heap, so we need to
|
|
// open it and push it into the heap.
|
|
iter, err := m.openIter(file)
|
|
if err != nil {
|
|
m.err = err
|
|
return false
|
|
}
|
|
if !iter.First() {
|
|
m.err = iter.Error()
|
|
closeErr := iter.Close()
|
|
m.err = goerrors.Join(m.err, closeErr)
|
|
if m.err != nil {
|
|
return false
|
|
}
|
|
}
|
|
heap.Push(&m.heap, mergingIterItem{
|
|
Iterator: iter,
|
|
index: m.nextFileIndex,
|
|
})
|
|
m.nextFileIndex++
|
|
}
|
|
return len(m.heap) > 0
|
|
}
|
|
|
|
func (m *mergingIter) Last() bool {
|
|
m.err = nil
|
|
// Close all opened iterators.
|
|
for _, item := range m.heap {
|
|
if err := item.Close(); err != nil && m.err == nil {
|
|
m.err = err
|
|
}
|
|
}
|
|
m.heap = nil
|
|
m.nextFileIndex = len(m.orderedFiles)
|
|
if m.err != nil {
|
|
return false
|
|
}
|
|
|
|
// Sort files by last key in reverse order.
|
|
files := slices.Clone(m.orderedFiles)
|
|
slices.SortFunc(files, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(b.lastKey, a.lastKey)
|
|
})
|
|
|
|
// Since we don't need to implement Prev() method,
|
|
// we can just open the file with the largest last key.
|
|
for _, file := range files {
|
|
iter, err := m.openIter(file)
|
|
if err != nil {
|
|
m.err = err
|
|
return false
|
|
}
|
|
if iter.Last() {
|
|
index := -1
|
|
for i := range m.orderedFiles {
|
|
if m.orderedFiles[i] == file {
|
|
index = i
|
|
break
|
|
}
|
|
}
|
|
heap.Push(&m.heap, mergingIterItem{
|
|
Iterator: iter,
|
|
index: index,
|
|
})
|
|
break
|
|
}
|
|
m.err = iter.Error()
|
|
// If the file is empty, we can close it and continue to open the next file.
|
|
closeErr := iter.Close()
|
|
m.err = goerrors.Join(m.err, closeErr)
|
|
if m.err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return m.Valid()
|
|
}
|
|
|
|
func (m *mergingIter) Valid() bool {
|
|
return m.err == nil && len(m.heap) > 0
|
|
}
|
|
|
|
func (m *mergingIter) Error() error {
|
|
return m.err
|
|
}
|
|
|
|
func (m *mergingIter) UnsafeKey() []byte {
|
|
return m.heap[0].UnsafeKey()
|
|
}
|
|
|
|
func (m *mergingIter) UnsafeValue() []byte {
|
|
return m.heap[0].UnsafeValue()
|
|
}
|
|
|
|
func (m *mergingIter) Close() error {
|
|
m.err = nil
|
|
for _, item := range m.heap {
|
|
if err := item.Close(); err != nil && m.err == nil {
|
|
m.err = err
|
|
}
|
|
}
|
|
return m.err
|
|
}
|
|
|
|
// DiskSorterOptions holds the optional parameters for DiskSorter.
|
|
type DiskSorterOptions struct {
|
|
// Concurrency is the maximum number of goroutines that can be used to
|
|
// sort data in parallel.
|
|
//
|
|
// The default value is runtime.GOMAXPROCS(0).
|
|
Concurrency int
|
|
|
|
// WriterBufferSize is the size of the buffer used by the writer.
|
|
// Larger buffer size can improve the write and sort performance,
|
|
// and reduce the number of disk operations.
|
|
//
|
|
// The default value is 128MB.
|
|
WriterBufferSize int
|
|
|
|
// CompactionThreshold is maximum overlap depth necessary to trigger a
|
|
// compaction. The overlap depth is the number of files that overlap at
|
|
// same interval.
|
|
//
|
|
// For example, consider the following files:
|
|
//
|
|
// file 0: a-----d
|
|
// file 1: b-----e
|
|
// file 2: c-------g
|
|
// file 3: d---f
|
|
//
|
|
// The overlap depth of these files is 3, because file 0, 1, 2 overlap at
|
|
// the interval [c, d), and file 1, 2, 3 overlap at the interval [d, e).
|
|
//
|
|
// If the overlap depth reached CompactionThreshold, the sorter will compact
|
|
// files to reduce the overlap depth during sorting. The larger the overlap
|
|
// depth, the larger read amplification will be during iteration. This is a
|
|
// trade-off between read amplification and sorting cost. Setting this value
|
|
// to math.MaxInt will disable the compaction.
|
|
//
|
|
// The default value is 16.
|
|
CompactionThreshold int
|
|
|
|
// MaxCompactionDepth is the maximum files involved in a single compaction.
|
|
// The minimum value is 2. Any value less than 2 will be treated as not set.
|
|
//
|
|
// The default value is 64.
|
|
MaxCompactionDepth int
|
|
|
|
// MaxCompactionSize is the maximum size of key-value pairs involved in a
|
|
// single compaction.
|
|
//
|
|
// The default value is 512MB.
|
|
MaxCompactionSize int
|
|
|
|
// Logger is used to write log messages.
|
|
//
|
|
// The default value is log.L().
|
|
Logger *zap.Logger
|
|
}
|
|
|
|
func (o *DiskSorterOptions) ensureDefaults() {
|
|
if o.Concurrency == 0 {
|
|
o.Concurrency = runtime.GOMAXPROCS(0)
|
|
}
|
|
if o.WriterBufferSize == 0 {
|
|
o.WriterBufferSize = 128 << 20
|
|
}
|
|
if o.CompactionThreshold == 0 {
|
|
o.CompactionThreshold = 16
|
|
}
|
|
if o.MaxCompactionDepth < 2 {
|
|
o.MaxCompactionDepth = 64
|
|
}
|
|
if o.MaxCompactionSize == 0 {
|
|
o.MaxCompactionSize = 512 << 20
|
|
}
|
|
if o.Logger == nil {
|
|
o.Logger = log.L()
|
|
}
|
|
}
|
|
|
|
// DiskSorter is an external sorter that sorts data on disk.
|
|
type DiskSorter struct {
|
|
opts *DiskSorterOptions
|
|
fs vfs.FS
|
|
dirname string
|
|
// cache is shared by all sst readers.
|
|
cache *pebble.Cache
|
|
readerPool *sstReaderPool
|
|
idAlloc atomic.Int64
|
|
state atomic.Int32
|
|
pendingFiles struct {
|
|
syncutil.Mutex
|
|
files []*fileMetadata
|
|
}
|
|
// The list of files that have been sorted.
|
|
// They must be ordered by its start key.
|
|
orderedFiles []*fileMetadata
|
|
}
|
|
|
|
// OpenDiskSorter opens a DiskSorter with the given directory.
|
|
func OpenDiskSorter(dirname string, opts *DiskSorterOptions) (*DiskSorter, error) {
|
|
if opts == nil {
|
|
opts = &DiskSorterOptions{}
|
|
}
|
|
opts.ensureDefaults()
|
|
|
|
fs := vfs.Default
|
|
if dirname == "" {
|
|
fs = vfs.NewMem()
|
|
}
|
|
if err := fs.MkdirAll(dirname, 0755); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
cache := pebble.NewCache(8 << 20)
|
|
readerPool := newSSTReaderPool(fs, dirname, cache)
|
|
d := &DiskSorter{
|
|
opts: opts,
|
|
fs: fs,
|
|
dirname: dirname,
|
|
cache: cache,
|
|
readerPool: readerPool,
|
|
}
|
|
if err := d.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
return d, nil
|
|
}
|
|
|
|
func (d *DiskSorter) init() error {
|
|
list, err := d.fs.List(d.dirname)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
g, ctx := errgroup.WithContext(context.Background())
|
|
outputCh := make(chan *fileMetadata, len(list))
|
|
for _, name := range list {
|
|
if strings.HasSuffix(name, tmpFileSuffix) {
|
|
_ = d.fs.Remove(d.fs.PathJoin(d.dirname, name))
|
|
continue
|
|
}
|
|
fileNum, ok := parseFilename(d.fs, name)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if int64(fileNum) > d.idAlloc.Load() {
|
|
d.idAlloc.Store(int64(fileNum))
|
|
}
|
|
g.Go(func() error {
|
|
if ctx.Err() != nil {
|
|
return errors.Trace(ctx.Err())
|
|
}
|
|
file, err := d.readFileMetadata(fileNum)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
outputCh <- file
|
|
return nil
|
|
})
|
|
}
|
|
if err := g.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
close(outputCh)
|
|
files := make([]*fileMetadata, 0, len(outputCh))
|
|
for file := range outputCh {
|
|
files = append(files, file)
|
|
}
|
|
if _, err := d.fs.Stat(d.fs.PathJoin(d.dirname, diskSorterSortedFile)); err == nil {
|
|
slices.SortFunc(files, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(a.startKey, b.startKey)
|
|
})
|
|
d.orderedFiles = files
|
|
d.state.Store(diskSorterStateSorted)
|
|
} else {
|
|
d.pendingFiles.files = files
|
|
d.state.Store(diskSorterStateWriting)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *DiskSorter) readFileMetadata(fileNum int) (*fileMetadata, error) {
|
|
reader, err := d.readerPool.get(fileNum)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
defer func() {
|
|
_ = d.readerPool.unref(fileNum)
|
|
}()
|
|
|
|
meta := &fileMetadata{
|
|
fileNum: fileNum,
|
|
}
|
|
if prop, ok := reader.Properties.UserProperties[kvStatsPropKey]; ok {
|
|
if err := json.Unmarshal([]byte(prop), &meta.kvStats); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
iter, err := reader.NewIter(nil, nil)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
defer func() {
|
|
_ = iter.Close()
|
|
}()
|
|
|
|
firstKey, _ := iter.First()
|
|
if firstKey != nil {
|
|
meta.startKey = slices.Clone(firstKey.UserKey)
|
|
} else if err := iter.Error(); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
lastKey, _ := iter.Last()
|
|
if lastKey != nil {
|
|
meta.lastKey = slices.Clone(lastKey.UserKey)
|
|
// Make endKey is exclusive. To avoid unnecessary overlap,
|
|
// we append 0 to make endKey is the smallest key which is
|
|
// greater than the last key.
|
|
meta.endKey = append(meta.lastKey, 0)
|
|
} else if err := iter.Error(); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return meta, nil
|
|
}
|
|
|
|
// NewWriter implements the ExternalSorter.NewWriter.
|
|
func (d *DiskSorter) NewWriter(_ context.Context) (Writer, error) {
|
|
if d.state.Load() > diskSorterStateWriting {
|
|
return nil, errors.Errorf("diskSorter started sorting, cannot write more data")
|
|
}
|
|
return &diskSorterWriter{
|
|
buf: make([]byte, d.opts.WriterBufferSize),
|
|
newSSTWriter: func() (*sstWriter, error) {
|
|
fileNum := int(d.idAlloc.Add(1))
|
|
return newSSTWriter(d.fs, d.dirname, fileNum, defaultKVStatsBucketSize,
|
|
func(meta *fileMetadata) {
|
|
d.pendingFiles.Lock()
|
|
d.pendingFiles.files = append(d.pendingFiles.files, meta)
|
|
d.pendingFiles.Unlock()
|
|
})
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// Sort implements the ExternalSorter.Sort.
|
|
func (d *DiskSorter) Sort(ctx context.Context) error {
|
|
if d.state.Load() == diskSorterStateSorted {
|
|
return nil
|
|
}
|
|
d.state.Store(diskSorterStateSorting)
|
|
if err := d.doSort(ctx); err != nil {
|
|
return err
|
|
}
|
|
d.state.Store(diskSorterStateSorted)
|
|
|
|
// Persist the sorted state.
|
|
f, err := d.fs.Create(d.fs.PathJoin(d.dirname, diskSorterSortedFile))
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return errors.Trace(f.Close())
|
|
}
|
|
|
|
func (d *DiskSorter) doSort(ctx context.Context) error {
|
|
d.pendingFiles.Lock()
|
|
defer d.pendingFiles.Unlock()
|
|
if len(d.pendingFiles.files) == 0 {
|
|
return nil
|
|
}
|
|
d.orderedFiles = d.pendingFiles.files
|
|
d.pendingFiles.files = nil
|
|
slices.SortFunc(d.orderedFiles, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(a.startKey, b.startKey)
|
|
})
|
|
files := pickCompactionFiles(d.orderedFiles, d.opts.CompactionThreshold, d.opts.Logger)
|
|
for len(files) > 0 {
|
|
if err := d.compactFiles(ctx, files); err != nil {
|
|
return err
|
|
}
|
|
files = pickCompactionFiles(d.orderedFiles, d.opts.CompactionThreshold, d.opts.Logger)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *DiskSorter) compactFiles(ctx context.Context, files []*fileMetadata) error {
|
|
// Split files into multiple compaction groups.
|
|
// Each group will be compacted independently.
|
|
groups := splitCompactionFiles(files, d.opts.MaxCompactionDepth)
|
|
compactions := make([]*compaction, 0, len(groups))
|
|
for _, group := range groups {
|
|
compactions = append(compactions, buildCompactions(group, d.opts.MaxCompactionSize)...)
|
|
}
|
|
|
|
// Build file references.
|
|
fileRefs := make(map[int]*atomic.Int32)
|
|
for _, file := range files {
|
|
fileRefs[file.fileNum] = new(atomic.Int32)
|
|
}
|
|
for _, c := range compactions {
|
|
for _, file := range c.overlapFiles {
|
|
fileRefs[file.fileNum].Add(1)
|
|
}
|
|
}
|
|
d.opts.Logger.Info("total compactions", zap.Int("count", len(compactions)))
|
|
|
|
// Run all compactions.
|
|
removedFileNums := generic.NewSyncMap[int, struct{}](len(files))
|
|
outputCh := make(chan *fileMetadata, len(compactions))
|
|
g, gCtx := errgroup.WithContext(ctx)
|
|
g.SetLimit(d.opts.Concurrency)
|
|
for _, c := range compactions {
|
|
g.Go(func() error {
|
|
if gCtx.Err() != nil {
|
|
return errors.Trace(gCtx.Err())
|
|
}
|
|
output, err := d.runCompaction(gCtx, c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
outputCh <- output
|
|
for _, file := range c.overlapFiles {
|
|
if fileRefs[file.fileNum].Add(-1) == 0 {
|
|
_ = d.fs.Remove(makeFilename(d.fs, d.dirname, file.fileNum))
|
|
removedFileNums.Store(file.fileNum, struct{}{})
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
if err := g.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Update d.orderedFiles.
|
|
close(outputCh)
|
|
for output := range outputCh {
|
|
d.orderedFiles = append(d.orderedFiles, output)
|
|
}
|
|
var newOrderedFiles []*fileMetadata
|
|
for _, file := range d.orderedFiles {
|
|
if _, ok := removedFileNums.Load(file.fileNum); !ok {
|
|
newOrderedFiles = append(newOrderedFiles, file)
|
|
}
|
|
}
|
|
slices.SortFunc(newOrderedFiles, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(a.startKey, b.startKey)
|
|
})
|
|
d.orderedFiles = newOrderedFiles
|
|
return nil
|
|
}
|
|
|
|
func pickCompactionFiles(
|
|
allFiles []*fileMetadata,
|
|
compactionThreshold int,
|
|
logger *zap.Logger,
|
|
) []*fileMetadata {
|
|
type interval struct {
|
|
key []byte
|
|
depth int
|
|
}
|
|
// intervals is a list of intervals that perfectly overlap file boundaries.
|
|
// For example, if we have two files [a, c] and [b, d], the intervals will be
|
|
// [a, b), [b, c), [c, d).
|
|
intervals := make([]interval, 0, len(allFiles)*2)
|
|
for _, file := range allFiles {
|
|
intervals = append(intervals, interval{
|
|
key: file.startKey,
|
|
depth: 1,
|
|
})
|
|
intervals = append(intervals, interval{
|
|
key: file.endKey,
|
|
depth: -1,
|
|
})
|
|
}
|
|
slices.SortFunc(intervals, func(a, b interval) int {
|
|
return bytes.Compare(a.key, b.key)
|
|
})
|
|
|
|
// Compute the maximum overlap depth of each interval.
|
|
// See https://en.wikipedia.org/wiki/Sweep_line_algorithm.
|
|
maxDepth := 0
|
|
n := 0
|
|
for i := 1; i < len(intervals); i++ {
|
|
intervals[i].depth += intervals[n].depth
|
|
if intervals[i].depth > maxDepth {
|
|
maxDepth = intervals[i].depth
|
|
}
|
|
// Merge adjacent intervals with the same key.
|
|
if bytes.Equal(intervals[i].key, intervals[n].key) {
|
|
intervals[n] = intervals[i]
|
|
} else {
|
|
n++
|
|
intervals[n] = intervals[i]
|
|
}
|
|
}
|
|
intervals = intervals[:n+1]
|
|
if maxDepth < compactionThreshold {
|
|
return nil
|
|
}
|
|
|
|
var files []*fileMetadata
|
|
for _, file := range allFiles {
|
|
minIntervalIndex := sort.Search(len(intervals), func(i int) bool {
|
|
return bytes.Compare(intervals[i].key, file.startKey) >= 0
|
|
})
|
|
maxIntervalIndex := sort.Search(len(intervals), func(i int) bool {
|
|
return bytes.Compare(intervals[i].key, file.endKey) >= 0
|
|
})
|
|
for i := minIntervalIndex; i < maxIntervalIndex; i++ {
|
|
if intervals[i].depth >= compactionThreshold {
|
|
files = append(files, file)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
logger.Info(
|
|
"max overlap depth reached the compaction threshold, pick files to compact",
|
|
zap.Int("maxDepth", maxDepth),
|
|
zap.Int("threshold", compactionThreshold),
|
|
zap.Int("fileCount", len(files)),
|
|
)
|
|
return files
|
|
}
|
|
|
|
func splitCompactionFiles(files []*fileMetadata, maxCompactionDepth int) [][]*fileMetadata {
|
|
// Split files into non-overlapping groups.
|
|
slices.SortFunc(files, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(a.startKey, b.startKey)
|
|
})
|
|
var groups [][]*fileMetadata
|
|
curGroup := []*fileMetadata{files[0]}
|
|
maxEndKey := files[0].endKey
|
|
for _, file := range files[1:] {
|
|
if bytes.Compare(file.startKey, maxEndKey) >= 0 {
|
|
groups = append(groups, curGroup)
|
|
curGroup = []*fileMetadata{file}
|
|
} else {
|
|
curGroup = append(curGroup, file)
|
|
}
|
|
if bytes.Compare(file.endKey, maxEndKey) > 0 {
|
|
maxEndKey = file.endKey
|
|
}
|
|
}
|
|
if len(curGroup) > 0 {
|
|
groups = append(groups, curGroup)
|
|
}
|
|
|
|
var finalGroups [][]*fileMetadata
|
|
// Compact each group of files.
|
|
for _, group := range groups {
|
|
numSubGroups := (len(group)-1)/maxCompactionDepth + 1
|
|
subGroupSize := (len(group)-1)/numSubGroups + 1
|
|
for i := 0; i < len(group); i += subGroupSize {
|
|
j := min(i+subGroupSize, len(group))
|
|
finalGroups = append(finalGroups, group[i:j])
|
|
}
|
|
}
|
|
return finalGroups
|
|
}
|
|
|
|
type compaction struct {
|
|
startKey []byte
|
|
endKey []byte
|
|
// overlapFiles are files that overlap with the compaction range.
|
|
// They are ordered by their start keys.
|
|
overlapFiles []*fileMetadata
|
|
}
|
|
|
|
func buildCompactions(files []*fileMetadata, maxCompactionSize int) []*compaction {
|
|
var startKey, endKey []byte
|
|
buckets := make([]kvStatsBucket, 0, len(files))
|
|
for _, file := range files {
|
|
buckets = append(buckets, file.kvStats.Histogram...)
|
|
if startKey == nil || bytes.Compare(file.startKey, startKey) < 0 {
|
|
startKey = file.startKey
|
|
}
|
|
if endKey == nil || bytes.Compare(file.endKey, endKey) > 0 {
|
|
endKey = file.endKey
|
|
}
|
|
}
|
|
// If there is no kv stats, return a single compaction for all files.
|
|
if len(buckets) == 0 {
|
|
overlapFiles := slices.Clone(files)
|
|
slices.SortFunc(overlapFiles, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(a.startKey, b.startKey)
|
|
})
|
|
return []*compaction{{
|
|
startKey: startKey,
|
|
endKey: endKey,
|
|
overlapFiles: overlapFiles,
|
|
}}
|
|
}
|
|
|
|
slices.SortFunc(buckets, func(a, b kvStatsBucket) int {
|
|
return bytes.Compare(a.UpperBound, b.UpperBound)
|
|
})
|
|
// Merge buckets with the same upper bound.
|
|
n := 0
|
|
for i := 1; i < len(buckets); i++ {
|
|
if bytes.Equal(buckets[n].UpperBound, buckets[i].UpperBound) {
|
|
buckets[n].Size += buckets[i].Size
|
|
} else {
|
|
n++
|
|
buckets[n] = buckets[i]
|
|
}
|
|
}
|
|
buckets = buckets[:n+1]
|
|
|
|
var (
|
|
kvSize int
|
|
compactions []*compaction
|
|
)
|
|
// We assume that data is uniformly distributed in each file as well as
|
|
// each bucket range. So we can simply merge buckets of different files.
|
|
// Although the adjacent buckets may not be of the same file, the current
|
|
// bucket size is approximately the size of kv pairs between the previous
|
|
// bucket's upper bound and the current bucket's upper bound.
|
|
//
|
|
// Consider the following example:
|
|
//
|
|
// file 1 buckets: (a, 10), (c, 10), (e, 10)
|
|
// file 2 buckets: (b, 10), (d, 10), (f, 10)
|
|
// merged buckets: (a, 10), (b, 10), (c, 10), (d, 10), (e, 10), (f, 10).
|
|
//
|
|
// For the adjacent buckets (b, 10) and (c, 10), the size of kv pairs
|
|
// between b and c is approximately 10 (5 from file 1 and 5 from file 2).
|
|
for i, bucket := range buckets {
|
|
if i+1 == len(buckets) {
|
|
compactions = append(compactions, &compaction{
|
|
startKey: startKey,
|
|
endKey: endKey,
|
|
})
|
|
break
|
|
}
|
|
kvSize += bucket.Size
|
|
if kvSize >= maxCompactionSize {
|
|
compactions = append(compactions, &compaction{
|
|
startKey: startKey,
|
|
endKey: bucket.UpperBound,
|
|
})
|
|
startKey = bucket.UpperBound
|
|
kvSize = 0
|
|
}
|
|
}
|
|
|
|
for _, c := range compactions {
|
|
for _, file := range files {
|
|
if !(bytes.Compare(file.endKey, c.startKey) <= 0 ||
|
|
bytes.Compare(file.startKey, c.endKey) >= 0) {
|
|
c.overlapFiles = append(c.overlapFiles, file)
|
|
}
|
|
}
|
|
slices.SortFunc(c.overlapFiles, func(a, b *fileMetadata) int {
|
|
return bytes.Compare(a.startKey, b.startKey)
|
|
})
|
|
}
|
|
return compactions
|
|
}
|
|
|
|
func (d *DiskSorter) runCompaction(ctx context.Context, c *compaction) (*fileMetadata, error) {
|
|
d.opts.Logger.Debug(
|
|
"run compaction",
|
|
zap.Binary("startKey", c.startKey),
|
|
zap.Binary("endKey", c.endKey),
|
|
zap.Int("fileCount", len(c.overlapFiles)),
|
|
)
|
|
|
|
var merged *fileMetadata
|
|
onSuccess := func(meta *fileMetadata) { merged = meta }
|
|
|
|
fileNum := int(d.idAlloc.Add(1))
|
|
sw, err := newSSTWriter(d.fs, d.dirname, fileNum, defaultKVStatsBucketSize, onSuccess)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
swClosed := false
|
|
defer func() {
|
|
if !swClosed {
|
|
_ = sw.Close()
|
|
}
|
|
}()
|
|
|
|
iter := newMergingIter(c.overlapFiles, d.openIter)
|
|
defer func() {
|
|
_ = iter.Close()
|
|
}()
|
|
|
|
iterations := 0
|
|
for iter.Seek(c.startKey); iter.Valid(); iter.Next() {
|
|
if bytes.Compare(iter.UnsafeKey(), c.endKey) >= 0 {
|
|
break
|
|
}
|
|
iterations++
|
|
if iterations%1000 == 0 && ctx.Err() != nil {
|
|
return nil, errors.Trace(ctx.Err())
|
|
}
|
|
if err := sw.Set(iter.UnsafeKey(), iter.UnsafeValue()); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if err := iter.Error(); err != nil {
|
|
return nil, err
|
|
}
|
|
err = sw.Close()
|
|
swClosed = true
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return merged, nil
|
|
}
|
|
|
|
func (d *DiskSorter) openIter(file *fileMetadata) (Iterator, error) {
|
|
reader, err := d.readerPool.get(file.fileNum)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
iter, err := reader.NewIter(nil, nil)
|
|
if err != nil {
|
|
_ = reader.Close()
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return &sstIter{
|
|
iter: iter,
|
|
onClose: func() error {
|
|
return d.readerPool.unref(file.fileNum)
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// IsSorted implements the ExternalSorter.IsSorted.
|
|
func (d *DiskSorter) IsSorted() bool {
|
|
return d.state.Load() == diskSorterStateSorted
|
|
}
|
|
|
|
// NewIterator implements the ExternalSorter.NewIterator.
|
|
func (d *DiskSorter) NewIterator(_ context.Context) (Iterator, error) {
|
|
if d.state.Load() != diskSorterStateSorted {
|
|
return nil, errors.Errorf("diskSorter is not sorted")
|
|
}
|
|
return newMergingIter(d.orderedFiles, d.openIter), nil
|
|
}
|
|
|
|
// Close implements the ExternalSorter.Close.
|
|
func (d *DiskSorter) Close() error {
|
|
d.cache.Unref()
|
|
return nil
|
|
}
|
|
|
|
// CloseAndCleanup implements the ExternalSorter.CloseAndCleanup.
|
|
func (d *DiskSorter) CloseAndCleanup() error {
|
|
d.cache.Unref()
|
|
err := d.fs.RemoveAll(d.dirname)
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
type keyValue struct {
|
|
key []byte
|
|
value []byte
|
|
}
|
|
|
|
type diskSorterWriter struct {
|
|
kvs []keyValue
|
|
buf []byte
|
|
off int
|
|
newSSTWriter func() (*sstWriter, error)
|
|
}
|
|
|
|
func (w *diskSorterWriter) Put(key, value []byte) error {
|
|
if w.off+len(key)+len(value) > len(w.buf) {
|
|
if err := w.flush(); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
// The default buffer is too small, enlarge it to fit the key and value.
|
|
// w.off must be 0 after flush.
|
|
if len(key)+len(value) > len(w.buf) {
|
|
w.buf = make([]byte, len(key)+len(value))
|
|
}
|
|
}
|
|
|
|
var kv keyValue
|
|
kv.key = w.buf[w.off : w.off+len(key)]
|
|
w.off += len(key)
|
|
copy(kv.key, key)
|
|
if len(value) > 0 {
|
|
kv.value = w.buf[w.off : w.off+len(value)]
|
|
w.off += len(value)
|
|
copy(kv.value, value)
|
|
}
|
|
w.kvs = append(w.kvs, kv)
|
|
return nil
|
|
}
|
|
|
|
func (w *diskSorterWriter) Flush() error {
|
|
if len(w.kvs) == 0 {
|
|
return nil
|
|
}
|
|
return w.flush()
|
|
}
|
|
|
|
func (w *diskSorterWriter) flush() error {
|
|
sw, err := w.newSSTWriter()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slices.SortFunc(w.kvs, func(a, b keyValue) int {
|
|
return bytes.Compare(a.key, b.key)
|
|
})
|
|
|
|
// To dedup keys before write them into the SST file.
|
|
// NOTE: keys should be sorted and deduped when construct one SST file.
|
|
var lastKey []byte
|
|
for _, kv := range w.kvs {
|
|
if bytes.Equal(lastKey, kv.key) {
|
|
continue
|
|
}
|
|
lastKey = kv.key
|
|
if err := sw.Set(kv.key, kv.value); err != nil {
|
|
_ = sw.Close()
|
|
return err
|
|
}
|
|
}
|
|
if err := sw.Close(); err != nil {
|
|
return err
|
|
}
|
|
w.kvs = w.kvs[:0]
|
|
w.off = 0
|
|
return nil
|
|
}
|
|
|
|
func (w *diskSorterWriter) Close() error {
|
|
return w.Flush()
|
|
}
|