Files
tidb/pkg/lightning/backend/local/engine.go

1725 lines
44 KiB
Go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"bytes"
"container/heap"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"slices"
"sync"
"time"
"unsafe"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util/hack"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
engineMetaKey = []byte{0, 'm', 'e', 't', 'a'}
normalIterStartKey = []byte{1}
)
type importMutexState uint32
const (
importMutexStateImport importMutexState = 1 << iota
importMutexStateClose
// importMutexStateReadLock is a special state because in this state we lock engine with read lock
// and add isImportingAtomic with this value. In other state, we directly store with the state value.
// so this must always the last value of this enum.
importMutexStateReadLock
// we need to lock the engine when it's open as we do when it's close, otherwise GetEngienSize may race with OpenEngine
importMutexStateOpen
)
const (
// DupDetectDirSuffix is used by pre-deduplication to store the encoded index KV.
DupDetectDirSuffix = ".dupdetect"
// DupResultDirSuffix is used by pre-deduplication to store the duplicated row ID.
DupResultDirSuffix = ".dupresult"
)
// engineMeta contains some field that is necessary to continue the engine restore/import process.
// These field should be written to disk when we update chunk checkpoint
type engineMeta struct {
TS uint64 `json:"ts"`
// Length is the number of KV pairs stored by the engine.
Length atomic.Int64 `json:"length"`
// TotalSize is the total pre-compressed KV byte size stored by engine.
TotalSize atomic.Int64 `json:"total_size"`
}
// Engine is a local engine.
type Engine struct {
engineMeta
closed atomic.Bool
db atomic.Pointer[pebble.DB]
UUID uuid.UUID
localWriters sync.Map
regionSplitSize int64
regionSplitKeyCnt int64
regionSplitKeysCache [][]byte
// isImportingAtomic is an atomic variable indicating whether this engine is importing.
// This should not be used as a "spin lock" indicator.
isImportingAtomic atomic.Uint32
// flush and ingest sst hold the rlock, other operation hold the wlock.
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
sstDir string
sstMetasChan chan metaOrFlush
ingestErr common.OnceError
wg sync.WaitGroup
sstIngester sstIngester
// sst seq lock
seqLock sync.Mutex
// seq number for incoming sst meta
nextSeq int32
// max seq of sst metas ingested into pebble
finishedMetaSeq atomic.Int32
config backend.LocalEngineConfig
tableInfo *checkpoints.TidbTableInfo
dupDetectOpt common.DupDetectOpt
// total size of SST files waiting to be ingested
pendingFileSize atomic.Int64
// statistics for pebble kv iter.
importedKVSize atomic.Int64
importedKVCount atomic.Int64
keyAdapter common.KeyAdapter
duplicateDetection bool
duplicateDB *pebble.DB
logger log.Logger
}
var _ engineapi.Engine = (*Engine)(nil)
func (e *Engine) setError(err error) {
if err != nil {
e.ingestErr.Set(err)
e.cancel()
}
}
func (e *Engine) getDB() *pebble.DB {
return e.db.Load()
}
// Close closes the engine and release all resources.
func (e *Engine) Close() error {
e.logger.Debug("closing local engine", zap.Stringer("engine", e.UUID), zap.Stack("stack"))
db := e.getDB()
if db == nil {
return nil
}
err := errors.Trace(db.Close())
e.db.Store(nil)
return err
}
// Cleanup remove meta, db and duplicate detection files
func (e *Engine) Cleanup(dataDir string) error {
if err := os.RemoveAll(e.sstDir); err != nil {
return errors.Trace(err)
}
uuid := e.UUID.String()
if err := os.RemoveAll(filepath.Join(dataDir, uuid+DupDetectDirSuffix)); err != nil {
return errors.Trace(err)
}
if err := os.RemoveAll(filepath.Join(dataDir, uuid+DupResultDirSuffix)); err != nil {
return errors.Trace(err)
}
dbPath := filepath.Join(dataDir, uuid)
return errors.Trace(os.RemoveAll(dbPath))
}
// Exist checks if db folder existing (meta sometimes won't flush before lightning exit)
func (e *Engine) Exist(dataDir string) error {
dbPath := filepath.Join(dataDir, e.UUID.String())
if _, err := os.Stat(dbPath); err != nil {
return err
}
return nil
}
func isStateLocked(state importMutexState) bool {
return state&(importMutexStateClose|importMutexStateImport) != 0
}
func (e *Engine) isLocked() bool {
// the engine is locked only in import or close state.
return isStateLocked(importMutexState(e.isImportingAtomic.Load()))
}
// rLock locks the local file with shard read state. Only used for flush and ingest SST files.
func (e *Engine) rLock() {
e.mutex.RLock()
e.isImportingAtomic.Add(uint32(importMutexStateReadLock))
}
func (e *Engine) rUnlock() {
if e == nil {
return
}
e.isImportingAtomic.Sub(uint32(importMutexStateReadLock))
e.mutex.RUnlock()
}
// lock locks the local file for importing.
func (e *Engine) lock(state importMutexState) {
e.mutex.Lock()
e.isImportingAtomic.Store(uint32(state))
}
// lockUnless tries to lock the local file unless it is already locked into the state given by
// ignoreStateMask. Returns whether the lock is successful.
func (e *Engine) lockUnless(newState, ignoreStateMask importMutexState) bool {
curState := e.isImportingAtomic.Load()
if curState&uint32(ignoreStateMask) != 0 {
return false
}
e.lock(newState)
return true
}
// tryRLock tries to read-lock the local file unless it is already write locked.
// Returns whether the lock is successful.
func (e *Engine) tryRLock() bool {
curState := e.isImportingAtomic.Load()
// engine is in import/close state.
if isStateLocked(importMutexState(curState)) {
return false
}
e.rLock()
return true
}
func (e *Engine) unlock() {
if e == nil {
return
}
e.isImportingAtomic.Store(0)
e.mutex.Unlock()
}
var sizeOfKVPair = int64(unsafe.Sizeof(common.KvPair{}))
// TotalMemorySize returns the total memory size of the engine.
func (e *Engine) TotalMemorySize() int64 {
var memSize int64
e.localWriters.Range(func(k, _ any) bool {
w := k.(*Writer)
if w.kvBuffer != nil {
w.Lock()
memSize += w.kvBuffer.TotalSize()
w.Unlock()
}
w.Lock()
memSize += sizeOfKVPair * int64(cap(w.writeBatch))
w.Unlock()
return true
})
return memSize
}
// KVStatistics returns the total kv size and total kv count.
func (e *Engine) KVStatistics() (totalSize int64, totalKVCount int64) {
return e.TotalSize.Load(), e.Length.Load()
}
// ImportedStatistics returns the imported kv size and imported kv count.
func (e *Engine) ImportedStatistics() (importedSize int64, importedKVCount int64) {
return e.importedKVSize.Load(), e.importedKVCount.Load()
}
// ConflictInfo implements common.Engine.
func (*Engine) ConflictInfo() engineapi.ConflictInfo {
return engineapi.ConflictInfo{}
}
// ID is the identifier of an engine.
func (e *Engine) ID() string {
return e.UUID.String()
}
// GetKeyRange implements common.Engine.
func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
firstLey, lastKey, err := e.GetFirstAndLastKey(nil, nil)
if err != nil {
return nil, nil, errors.Trace(err)
}
return firstLey, nextKey(lastKey), nil
}
// GetRegionSplitKeys implements common.Engine.
func (e *Engine) GetRegionSplitKeys() ([][]byte, error) {
return e.getRegionSplitKeys(e.regionSplitSize, e.regionSplitKeyCnt)
}
func (e *Engine) getRegionSplitKeys(regionSplitSize, regionSplitKeyCnt int64) ([][]byte, error) {
sizeProps, err := getSizePropertiesFn(e.logger, e.getDB(), e.keyAdapter)
if err != nil {
return nil, errors.Trace(err)
}
startKey, endKey, err := e.GetKeyRange()
if err != nil {
return nil, errors.Trace(err)
}
ranges := splitRangeBySizeProps(
engineapi.Range{Start: startKey, End: endKey},
sizeProps,
regionSplitSize,
regionSplitKeyCnt,
)
keys := make([][]byte, 0, len(ranges)+1)
for _, r := range ranges {
keys = append(keys, r.Start)
}
keys = append(keys, ranges[len(ranges)-1].End)
e.regionSplitKeysCache = keys
return keys, nil
}
type rangeOffsets struct {
Size uint64
Keys uint64
}
type rangeProperty struct {
Key []byte
rangeOffsets
}
// Less implements btree.Item interface.
func (r *rangeProperty) Less(than btree.Item) bool {
ta := than.(*rangeProperty)
return bytes.Compare(r.Key, ta.Key) < 0
}
var _ btree.Item = &rangeProperty{}
type rangeProperties []rangeProperty
// Encode encodes the range properties into a byte slice.
func (r rangeProperties) Encode() []byte {
b := make([]byte, 0, 1024)
idx := 0
for _, p := range r {
b = append(b, 0, 0, 0, 0)
binary.BigEndian.PutUint32(b[idx:], uint32(len(p.Key)))
idx += 4
b = append(b, p.Key...)
idx += len(p.Key)
b = append(b, 0, 0, 0, 0, 0, 0, 0, 0)
binary.BigEndian.PutUint64(b[idx:], p.Size)
idx += 8
b = append(b, 0, 0, 0, 0, 0, 0, 0, 0)
binary.BigEndian.PutUint64(b[idx:], p.Keys)
idx += 8
}
return b
}
// RangePropertiesCollector collects range properties for each range.
type RangePropertiesCollector struct {
props rangeProperties
lastOffsets rangeOffsets
lastKey []byte
currentOffsets rangeOffsets
propSizeIdxDistance uint64
propKeysIdxDistance uint64
}
func newRangePropertiesCollector() pebble.TablePropertyCollector {
return &RangePropertiesCollector{
props: make([]rangeProperty, 0, 1024),
propSizeIdxDistance: defaultPropSizeIndexDistance,
propKeysIdxDistance: defaultPropKeysIndexDistance,
}
}
func (c *RangePropertiesCollector) sizeInLastRange() uint64 {
return c.currentOffsets.Size - c.lastOffsets.Size
}
func (c *RangePropertiesCollector) keysInLastRange() uint64 {
return c.currentOffsets.Keys - c.lastOffsets.Keys
}
func (c *RangePropertiesCollector) insertNewPoint(key []byte) {
c.lastOffsets = c.currentOffsets
c.props = append(c.props, rangeProperty{Key: slices.Clone(key), rangeOffsets: c.currentOffsets})
}
// Add implements `pebble.TablePropertyCollector`.
// Add implements `TablePropertyCollector.Add`.
func (c *RangePropertiesCollector) Add(key pebble.InternalKey, value []byte) error {
if key.Kind() != pebble.InternalKeyKindSet || bytes.Equal(key.UserKey, engineMetaKey) {
return nil
}
c.currentOffsets.Size += uint64(len(value)) + uint64(len(key.UserKey))
c.currentOffsets.Keys++
if len(c.lastKey) == 0 || c.sizeInLastRange() >= c.propSizeIdxDistance ||
c.keysInLastRange() >= c.propKeysIdxDistance {
c.insertNewPoint(key.UserKey)
}
c.lastKey = append(c.lastKey[:0], key.UserKey...)
return nil
}
// Finish implements `pebble.TablePropertyCollector`.
func (c *RangePropertiesCollector) Finish(userProps map[string]string) error {
if c.sizeInLastRange() > 0 || c.keysInLastRange() > 0 {
c.insertNewPoint(c.lastKey)
}
userProps[propRangeIndex] = string(c.props.Encode())
return nil
}
// Name implements `pebble.TablePropertyCollector`.
func (*RangePropertiesCollector) Name() string {
return propRangeIndex
}
type sizeProperties struct {
totalSize uint64
indexHandles *btree.BTree
}
func newSizeProperties() *sizeProperties {
return &sizeProperties{indexHandles: btree.New(32)}
}
func (s *sizeProperties) add(item *rangeProperty) {
if old := s.indexHandles.ReplaceOrInsert(item); old != nil {
o := old.(*rangeProperty)
item.Keys += o.Keys
item.Size += o.Size
}
}
func (s *sizeProperties) addAll(props rangeProperties) {
prevRange := rangeOffsets{}
for _, r := range props {
s.add(&rangeProperty{
Key: r.Key,
rangeOffsets: rangeOffsets{Keys: r.Keys - prevRange.Keys, Size: r.Size - prevRange.Size},
})
prevRange = r.rangeOffsets
}
if len(props) > 0 {
s.totalSize += props[len(props)-1].Size
}
}
// iter the tree until f return false
func (s *sizeProperties) iter(f func(p *rangeProperty) bool) {
s.indexHandles.Ascend(func(i btree.Item) bool {
prop := i.(*rangeProperty)
return f(prop)
})
}
func decodeRangeProperties(data []byte, keyAdapter common.KeyAdapter) (rangeProperties, error) {
r := make(rangeProperties, 0, 16)
for len(data) > 0 {
if len(data) < 4 {
return nil, io.ErrUnexpectedEOF
}
keyLen := int(binary.BigEndian.Uint32(data[:4]))
data = data[4:]
if len(data) < keyLen+8*2 {
return nil, io.ErrUnexpectedEOF
}
key := data[:keyLen]
data = data[keyLen:]
size := binary.BigEndian.Uint64(data[:8])
keys := binary.BigEndian.Uint64(data[8:])
data = data[16:]
if !bytes.Equal(key, engineMetaKey) {
userKey, err := keyAdapter.Decode(nil, key)
if err != nil {
return nil, errors.Annotate(err, "failed to decode key with keyAdapter")
}
r = append(r, rangeProperty{Key: userKey, rangeOffsets: rangeOffsets{Size: size, Keys: keys}})
}
}
return r, nil
}
// getSizePropertiesFn is used to let unit test replace the real function.
var getSizePropertiesFn = getSizeProperties
func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter common.KeyAdapter) (*sizeProperties, error) {
sstables, err := db.SSTables(pebble.WithProperties())
if err != nil {
logger.Warn("get sst table properties failed", log.ShortError(err))
return nil, errors.Trace(err)
}
sizeProps := newSizeProperties()
for _, level := range sstables {
for _, info := range level {
if prop, ok := info.Properties.UserProperties[propRangeIndex]; ok {
data := hack.Slice(prop)
rangeProps, err := decodeRangeProperties(data, keyAdapter)
if err != nil {
logger.Warn("decodeRangeProperties failed",
zap.Stringer("fileNum", info.FileNum), log.ShortError(err))
return nil, errors.Trace(err)
}
sizeProps.addAll(rangeProps)
}
}
}
return sizeProps, nil
}
func (e *Engine) getEngineFileSize() backend.EngineFileSize {
db := e.getDB()
var total pebble.LevelMetrics
if db != nil {
metrics := db.Metrics()
total = metrics.Total()
}
var memSize int64
e.localWriters.Range(func(k, _ any) bool {
w := k.(*Writer)
memSize += int64(w.EstimatedSize())
return true
})
pendingSize := e.pendingFileSize.Load()
// TODO: should also add the in-processing compaction sst writer size into MemSize
return backend.EngineFileSize{
UUID: e.UUID,
DiskSize: total.Size + pendingSize,
MemSize: memSize,
IsImporting: e.isLocked(),
}
}
// either a sstMeta or a flush message
type metaOrFlush struct {
meta *sstMeta
flushCh chan struct{}
}
type metaSeq struct {
// the sequence for this flush message, a flush call can return only if
// all the other flush will lower `flushSeq` are done
flushSeq int32
// the max sstMeta sequence number in this flush, after the flush is done (all SSTs are ingested),
// we can save chunks will a lower meta sequence number safely.
metaSeq int32
}
type metaSeqHeap struct {
arr []metaSeq
}
// Len returns the number of items in the priority queue.
func (h *metaSeqHeap) Len() int {
return len(h.arr)
}
// Less reports whether the item in the priority queue with
func (h *metaSeqHeap) Less(i, j int) bool {
return h.arr[i].flushSeq < h.arr[j].flushSeq
}
// Swap swaps the items at the passed indices.
func (h *metaSeqHeap) Swap(i, j int) {
h.arr[i], h.arr[j] = h.arr[j], h.arr[i]
}
// Push pushes the item onto the priority queue.
func (h *metaSeqHeap) Push(x any) {
h.arr = append(h.arr, x.(metaSeq))
}
// Pop removes the minimum item (according to Less) from the priority queue
func (h *metaSeqHeap) Pop() any {
item := h.arr[len(h.arr)-1]
h.arr = h.arr[:len(h.arr)-1]
return item
}
func (e *Engine) ingestSSTLoop() {
defer e.wg.Done()
type flushSeq struct {
seq int32
ch chan struct{}
}
seq := atomic.NewInt32(0)
finishedSeq := atomic.NewInt32(0)
var seqLock sync.Mutex
// a flush is finished iff all the compaction&ingest tasks with a lower seq number are finished.
flushQueue := make([]flushSeq, 0)
// inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1`
// this mean there are still at lease one compaction task with a lower seq unfinished.
inSyncSeqs := &metaSeqHeap{arr: make([]metaSeq, 0)}
type metaAndSeq struct {
metas []*sstMeta
seq int32
}
concurrency := e.config.CompactConcurrency
// when compaction is disabled, ingest is an serial action, so 1 routine is enough
if !e.config.Compact {
concurrency = 1
}
metaChan := make(chan metaAndSeq, concurrency)
for range concurrency {
e.wg.Add(1)
go func() {
defer func() {
if e.ingestErr.Get() != nil {
seqLock.Lock()
for _, f := range flushQueue {
f.ch <- struct{}{}
}
flushQueue = flushQueue[:0]
seqLock.Unlock()
}
e.wg.Done()
}()
for {
select {
case <-e.ctx.Done():
return
case metas, ok := <-metaChan:
if !ok {
return
}
ingestMetas := metas.metas
if e.config.Compact {
newMeta, err := e.sstIngester.mergeSSTs(metas.metas, e.sstDir, e.config.BlockSize)
if err != nil {
e.setError(err)
return
}
ingestMetas = []*sstMeta{newMeta}
}
// batchIngestSSTs will change ingestMetas' order, so we record the max seq here
metasMaxSeq := ingestMetas[len(ingestMetas)-1].seq
if err := e.batchIngestSSTs(ingestMetas); err != nil {
e.setError(err)
return
}
seqLock.Lock()
finSeq := finishedSeq.Load()
if metas.seq == finSeq+1 {
finSeq = metas.seq
finMetaSeq := metasMaxSeq
for len(inSyncSeqs.arr) > 0 {
if inSyncSeqs.arr[0].flushSeq != finSeq+1 {
break
}
finSeq++
finMetaSeq = inSyncSeqs.arr[0].metaSeq
heap.Remove(inSyncSeqs, 0)
}
var flushChans []chan struct{}
for _, seq := range flushQueue {
if seq.seq > finSeq {
break
}
flushChans = append(flushChans, seq.ch)
}
flushQueue = flushQueue[len(flushChans):]
finishedSeq.Store(finSeq)
e.finishedMetaSeq.Store(finMetaSeq)
seqLock.Unlock()
for _, c := range flushChans {
c <- struct{}{}
}
} else {
heap.Push(inSyncSeqs, metaSeq{flushSeq: metas.seq, metaSeq: metasMaxSeq})
seqLock.Unlock()
}
}
}
}()
}
compactAndIngestSSTs := func(metas []*sstMeta) {
if len(metas) > 0 {
seqLock.Lock()
metaSeq := seq.Add(1)
seqLock.Unlock()
select {
case <-e.ctx.Done():
case metaChan <- metaAndSeq{metas: metas, seq: metaSeq}:
}
}
}
pendingMetas := make([]*sstMeta, 0, 16)
totalSize := int64(0)
metasTmp := make([]*sstMeta, 0)
addMetas := func() {
if len(metasTmp) == 0 {
return
}
metas := metasTmp
metasTmp = make([]*sstMeta, 0, len(metas))
if !e.config.Compact {
compactAndIngestSSTs(metas)
return
}
for _, m := range metas {
if m.totalCount > 0 {
pendingMetas = append(pendingMetas, m)
totalSize += m.totalSize
if totalSize >= e.config.CompactThreshold {
compactMetas := pendingMetas
pendingMetas = make([]*sstMeta, 0, len(pendingMetas))
totalSize = 0
compactAndIngestSSTs(compactMetas)
}
}
}
}
readMetaLoop:
for {
closed := false
select {
case <-e.ctx.Done():
close(metaChan)
return
case m, ok := <-e.sstMetasChan:
if !ok {
closed = true
break
}
if m.flushCh != nil {
// meet a flush event, we should trigger a ingest task if there are pending metas,
// and then waiting for all the running flush tasks to be done.
if len(metasTmp) > 0 {
addMetas()
}
if len(pendingMetas) > 0 {
seqLock.Lock()
metaSeq := seq.Add(1)
flushQueue = append(flushQueue, flushSeq{ch: m.flushCh, seq: metaSeq})
seqLock.Unlock()
select {
case metaChan <- metaAndSeq{metas: pendingMetas, seq: metaSeq}:
case <-e.ctx.Done():
close(metaChan)
return
}
pendingMetas = make([]*sstMeta, 0, len(pendingMetas))
totalSize = 0
} else {
// none remaining metas needed to be ingested
seqLock.Lock()
curSeq := seq.Load()
finSeq := finishedSeq.Load()
// if all pending SST files are written, directly do a db.Flush
if curSeq == finSeq {
seqLock.Unlock()
m.flushCh <- struct{}{}
} else {
// waiting for pending compaction tasks
flushQueue = append(flushQueue, flushSeq{ch: m.flushCh, seq: curSeq})
seqLock.Unlock()
}
}
continue readMetaLoop
}
metasTmp = append(metasTmp, m.meta)
// try to drain all the sst meta from the chan to make sure all the SSTs are processed before handle a flush msg.
if len(e.sstMetasChan) > 0 {
continue readMetaLoop
}
addMetas()
}
if closed {
compactAndIngestSSTs(pendingMetas)
close(metaChan)
return
}
}
}
func (e *Engine) addSST(ctx context.Context, m *sstMeta) (int32, error) {
// set pending size after SST file is generated
e.pendingFileSize.Add(m.fileSize)
// make sure sstMeta is sent into the chan in order
e.seqLock.Lock()
defer e.seqLock.Unlock()
e.nextSeq++
seq := e.nextSeq
m.seq = seq
select {
case e.sstMetasChan <- metaOrFlush{meta: m}:
case <-ctx.Done():
return 0, ctx.Err()
case <-e.ctx.Done():
}
return seq, e.ingestErr.Get()
}
func (e *Engine) batchIngestSSTs(metas []*sstMeta) error {
if len(metas) == 0 {
return nil
}
slices.SortFunc(metas, func(i, j *sstMeta) int {
return bytes.Compare(i.minKey, j.minKey)
})
// non overlapping sst is grouped, and ingested in that order
metaLevels := make([][]*sstMeta, 0)
for _, meta := range metas {
inserted := false
for i, l := range metaLevels {
if bytes.Compare(l[len(l)-1].maxKey, meta.minKey) >= 0 {
continue
}
metaLevels[i] = append(l, meta)
inserted = true
break
}
if !inserted {
metaLevels = append(metaLevels, []*sstMeta{meta})
}
}
for _, l := range metaLevels {
if err := e.ingestSSTs(l); err != nil {
return err
}
}
return nil
}
func (e *Engine) ingestSSTs(metas []*sstMeta) error {
// use raw RLock to avoid change the lock state during flushing.
e.mutex.RLock()
defer e.mutex.RUnlock()
if e.closed.Load() {
return errorEngineClosed
}
totalSize := int64(0)
totalCount := int64(0)
fileSize := int64(0)
for _, m := range metas {
totalSize += m.totalSize
totalCount += m.totalCount
fileSize += m.fileSize
}
e.logger.Info("write data to local DB",
zap.Int64("size", totalSize),
zap.Int64("kvs", totalCount),
zap.Int("files", len(metas)),
zap.Int64("sstFileSize", fileSize),
zap.String("file", metas[0].path),
logutil.Key("firstKey", metas[0].minKey),
logutil.Key("lastKey", metas[len(metas)-1].maxKey))
if err := e.sstIngester.ingest(metas); err != nil {
return errors.Trace(err)
}
count := int64(0)
size := int64(0)
for _, m := range metas {
count += m.totalCount
size += m.totalSize
}
e.Length.Add(count)
e.TotalSize.Add(size)
return nil
}
func (e *Engine) flushLocalWriters(parentCtx context.Context) error {
eg, ctx := errgroup.WithContext(parentCtx)
e.localWriters.Range(func(k, _ any) bool {
eg.Go(func() error {
w := k.(*Writer)
return w.flush(ctx)
})
return true
})
return eg.Wait()
}
func (e *Engine) flushEngineWithoutLock(ctx context.Context) error {
if err := e.flushLocalWriters(ctx); err != nil {
return err
}
flushChan := make(chan struct{}, 1)
select {
case e.sstMetasChan <- metaOrFlush{flushCh: flushChan}:
case <-ctx.Done():
return ctx.Err()
case <-e.ctx.Done():
return e.ctx.Err()
}
select {
case <-flushChan:
case <-ctx.Done():
return ctx.Err()
case <-e.ctx.Done():
return e.ctx.Err()
}
if err := e.ingestErr.Get(); err != nil {
return errors.Trace(err)
}
if err := e.saveEngineMeta(); err != nil {
return err
}
flushFinishedCh, err := e.getDB().AsyncFlush()
if err != nil {
return errors.Trace(err)
}
select {
case <-flushFinishedCh:
return nil
case <-ctx.Done():
return ctx.Err()
case <-e.ctx.Done():
return e.ctx.Err()
}
}
func saveEngineMetaToDB(meta *engineMeta, db *pebble.DB) error {
jsonBytes, err := json.Marshal(meta)
if err != nil {
return errors.Trace(err)
}
// note: we can't set Sync to true since we disabled WAL.
return db.Set(engineMetaKey, jsonBytes, &pebble.WriteOptions{Sync: false})
}
// saveEngineMeta saves the metadata about the DB into the DB itself.
// This method should be followed by a Flush to ensure the data is actually synchronized
func (e *Engine) saveEngineMeta() error {
e.logger.Debug("save engine meta", zap.Stringer("uuid", e.UUID), zap.Int64("count", e.Length.Load()),
zap.Int64("size", e.TotalSize.Load()))
return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.getDB()))
}
func (e *Engine) loadEngineMeta() error {
jsonBytes, closer, err := e.getDB().Get(engineMetaKey)
if err != nil {
if err == pebble.ErrNotFound {
e.logger.Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), log.ShortError(err))
return nil
}
return err
}
//nolint: errcheck
defer closer.Close()
if err = json.Unmarshal(jsonBytes, &e.engineMeta); err != nil {
e.logger.Warn("local db failed to deserialize meta", zap.Stringer("uuid", e.UUID), zap.ByteString("content", jsonBytes), zap.Error(err))
return err
}
e.logger.Debug("load engine meta", zap.Stringer("uuid", e.UUID), zap.Int64("count", e.Length.Load()),
zap.Int64("size", e.TotalSize.Load()))
return nil
}
func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions, buf *membuf.Buffer) IngestLocalEngineIter {
if bytes.Compare(opts.LowerBound, normalIterStartKey) < 0 {
newOpts := *opts
newOpts.LowerBound = normalIterStartKey
opts = &newOpts
}
if !e.duplicateDetection {
iter, err := e.getDB().NewIter(opts)
if err != nil {
e.logger.Panic("fail to create iterator")
return nil
}
return &pebbleIter{Iterator: iter, buf: buf}
}
logger := log.Wrap(tidblogutil.Logger(ctx)).With(
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
zap.Int64("tableID", e.tableInfo.ID),
zap.Stringer("engineUUID", e.UUID))
return newDupDetectIter(
e.getDB(),
e.keyAdapter,
opts,
e.duplicateDB,
logger,
e.dupDetectOpt,
buf,
)
}
var _ engineapi.IngestData = (*Engine)(nil)
// GetFirstAndLastKey reads the first and last key in range [lowerBound, upperBound)
// in the engine. Empty upperBound means unbounded.
func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) (firstKey, lastKey []byte, err error) {
if len(upperBound) == 0 {
// we use empty slice for unbounded upper bound, but it means max value in pebble
// so reset to nil
upperBound = nil
}
opt := &pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
}
failpoint.Inject("mockGetFirstAndLastKey", func() {
failpoint.Return(lowerBound, upperBound, nil)
})
iter := e.newKVIter(context.Background(), opt, nil)
//nolint: errcheck
defer iter.Close()
// Needs seek to first because NewIter returns an iterator that is unpositioned
hasKey := iter.First()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to read the first key")
}
if !hasKey {
return nil, nil, nil
}
firstKey = slices.Clone(iter.Key())
iter.Last()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to seek to the last key")
}
lastKey = slices.Clone(iter.Key())
return firstKey, lastKey, nil
}
// NewIter implements IngestData interface.
func (e *Engine) NewIter(
ctx context.Context,
lowerBound, upperBound []byte,
bufPool *membuf.Pool,
) engineapi.ForwardIter {
return e.newKVIter(
ctx,
&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound},
bufPool.NewBuffer(),
)
}
// GetTS implements IngestData interface.
func (e *Engine) GetTS() uint64 {
return e.TS
}
// IncRef implements IngestData interface.
func (*Engine) IncRef() {}
// DecRef implements IngestData interface.
func (*Engine) DecRef() {}
// Finish implements IngestData interface.
func (e *Engine) Finish(totalBytes, totalCount int64) {
e.importedKVSize.Add(totalBytes)
e.importedKVCount.Add(totalCount)
}
// LoadIngestData return (local) Engine itself because Engine has implemented
// IngestData interface.
func (e *Engine) LoadIngestData(
ctx context.Context,
outCh chan<- engineapi.DataAndRanges,
) (err error) {
jobRangeKeys := e.regionSplitKeysCache
// when the region is large, we need to split to smaller job ranges to increase
// the concurrency.
if jobRangeKeys == nil || e.regionSplitSize > 2*int64(config.SplitRegionSize) {
e.regionSplitKeysCache = nil
jobRangeKeys, err = e.getRegionSplitKeys(
int64(config.SplitRegionSize), int64(config.SplitRegionKeys),
)
if err != nil {
return errors.Trace(err)
}
}
prev := jobRangeKeys[0]
for i := 1; i < len(jobRangeKeys); i++ {
cur := jobRangeKeys[i]
select {
case <-ctx.Done():
return ctx.Err()
case outCh <- engineapi.DataAndRanges{
Data: e,
SortedRanges: []engineapi.Range{{Start: prev, End: cur}},
}:
}
prev = cur
}
return nil
}
type sstMeta struct {
path string
minKey []byte
maxKey []byte
totalSize int64
totalCount int64
// used for calculate disk-quota
fileSize int64
seq int32
}
// Writer is used to write data into a SST file.
type Writer struct {
sync.Mutex
engine *Engine
memtableSizeLimit int64
// if the KVs are append in order, we can directly write the into SST file,
// else we must first store them in writeBatch and then batch flush into SST file.
isKVSorted bool
writer atomic.Pointer[sstWriter]
writerSize atomic.Uint64
// bytes buffer for writeBatch
kvBuffer *membuf.Buffer
writeBatch []common.KvPair
// if the kvs in writeBatch are in order, we can avoid doing a `sort.Slice` which
// is quite slow. in our bench, the sort operation eats about 5% of total CPU
isWriteBatchSorted bool
sortedKeyBuf []byte
batchCount int
batchSize atomic.Int64
lastMetaSeq int32
tikvCodec tikv.Codec
}
func (w *Writer) appendRowsSorted(kvs []common.KvPair) (err error) {
writer := w.writer.Load()
if writer == nil {
writer, err = w.createSSTWriter()
if err != nil {
return errors.Trace(err)
}
w.writer.Store(writer)
}
keyAdapter := w.engine.keyAdapter
totalKeySize := 0
for i := range kvs {
keySize := keyAdapter.EncodedLen(kvs[i].Key, kvs[i].RowID)
w.batchSize.Add(int64(keySize + len(kvs[i].Val)))
totalKeySize += keySize
}
w.batchCount += len(kvs)
// NoopKeyAdapter doesn't really change the key,
// skipping the encoding to avoid unnecessary alloc and copy.
if _, ok := keyAdapter.(common.NoopKeyAdapter); !ok {
if cap(w.sortedKeyBuf) < totalKeySize {
w.sortedKeyBuf = make([]byte, totalKeySize)
}
buf := w.sortedKeyBuf[:0]
newKvs := make([]common.KvPair, len(kvs))
for i := range kvs {
buf = keyAdapter.Encode(buf, kvs[i].Key, kvs[i].RowID)
newKvs[i] = common.KvPair{Key: buf, Val: kvs[i].Val}
buf = buf[len(buf):]
}
kvs = newKvs
}
if err := writer.writeKVs(kvs); err != nil {
return err
}
w.writerSize.Store(writer.writer.EstimatedSize())
return nil
}
func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
l := len(w.writeBatch)
cnt := w.batchCount
var lastKey []byte
if cnt > 0 {
lastKey = w.writeBatch[cnt-1].Key
}
keyAdapter := w.engine.keyAdapter
for _, pair := range kvs {
if w.isWriteBatchSorted && bytes.Compare(lastKey, pair.Key) > 0 {
w.isWriteBatchSorted = false
}
lastKey = pair.Key
w.batchSize.Add(int64(len(pair.Key) + len(pair.Val)))
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)
if cnt < l {
w.writeBatch[cnt].Key = key
w.writeBatch[cnt].Val = val
} else {
w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
}
cnt++
}
w.batchCount = cnt
if w.batchSize.Load() > w.memtableSizeLimit {
if err := w.flushKVs(ctx); err != nil {
return err
}
}
return nil
}
// AppendRows appends rows to the SST file.
func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
if len(kvs) == 0 {
return nil
}
if w.engine.closed.Load() {
return errorEngineClosed
}
for i := range kvs {
kvs[i].Key = w.tikvCodec.EncodeKey(kvs[i].Key)
}
w.Lock()
defer w.Unlock()
// if chunk has _tidb_rowid field, we can't ensure that the rows are sorted.
if w.isKVSorted && w.writer.Load() == nil {
for _, c := range columnNames {
if c == model.ExtraHandleName.L {
w.isKVSorted = false
}
}
}
if w.isKVSorted {
return w.appendRowsSorted(kvs)
}
return w.appendRowsUnsorted(ctx, kvs)
}
func (w *Writer) flush(ctx context.Context) error {
w.Lock()
defer w.Unlock()
if w.batchCount == 0 {
return nil
}
if len(w.writeBatch) > 0 {
if err := w.flushKVs(ctx); err != nil {
return errors.Trace(err)
}
}
writer := w.writer.Load()
if writer != nil {
meta, err := writer.close()
if err != nil {
return errors.Trace(err)
}
w.writer.Store(nil)
w.writerSize.Store(0)
w.batchCount = 0
if meta != nil && meta.totalSize > 0 {
return w.addSST(ctx, meta)
}
}
return nil
}
// EstimatedSize returns the estimated size of the SST file.
func (w *Writer) EstimatedSize() uint64 {
if size := w.writerSize.Load(); size > 0 {
return size
}
// if kvs are still in memory, only calculate half of the total size
// in our tests, SST file size is about 50% of the raw kv size
return uint64(w.batchSize.Load()) / 2
}
type flushStatus struct {
local *Engine
seq int32
}
// Flushed implements common.ChunkFlushStatus.
func (f flushStatus) Flushed() bool {
return f.seq <= f.local.finishedMetaSeq.Load()
}
// Close implements common.ChunkFlushStatus.
func (w *Writer) Close(ctx context.Context) (common.ChunkFlushStatus, error) {
defer w.kvBuffer.Destroy()
defer w.engine.localWriters.Delete(w)
err := w.flush(ctx)
// FIXME: in theory this line is useless, but In our benchmark with go1.15
// this can resolve the memory consistently increasing issue.
// maybe this is a bug related to go GC mechanism.
w.writeBatch = nil
return flushStatus{local: w.engine, seq: w.lastMetaSeq}, err
}
// IsSynced implements common.ChunkFlushStatus.
func (w *Writer) IsSynced() bool {
return w.batchCount == 0 && w.lastMetaSeq <= w.engine.finishedMetaSeq.Load()
}
func (w *Writer) flushKVs(ctx context.Context) error {
writer, err := w.createSSTWriter()
if err != nil {
return errors.Trace(err)
}
if !w.isWriteBatchSorted {
slices.SortFunc(w.writeBatch[:w.batchCount], func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})
w.isWriteBatchSorted = true
}
err = writer.writeKVs(w.writeBatch[:w.batchCount])
if err != nil {
return errors.Trace(err)
}
meta, err := writer.close()
if err != nil {
return errors.Trace(err)
}
failpoint.Inject("orphanWriterGoRoutine", func() {
_ = common.KillMySelf()
// mimic we meet context cancel error when `addSST`
<-ctx.Done()
time.Sleep(5 * time.Second)
failpoint.Return(errors.Trace(ctx.Err()))
})
err = w.addSST(ctx, meta)
if err != nil {
return errors.Trace(err)
}
w.batchSize.Store(0)
w.batchCount = 0
w.kvBuffer.Reset()
return nil
}
func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error {
seq, err := w.engine.addSST(ctx, meta)
if err != nil {
return err
}
w.lastMetaSeq = seq
return nil
}
func (w *Writer) createSSTWriter() (*sstWriter, error) {
path := filepath.Join(w.engine.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path, w.engine.config.BlockSize)
if err != nil {
return nil, err
}
sw := &sstWriter{sstMeta: &sstMeta{path: path}, writer: writer, logger: w.engine.logger}
return sw, nil
}
var errorUnorderedSSTInsertion = errors.New("inserting KVs into SST without order")
type sstWriter struct {
*sstMeta
writer *sstable.Writer
// To dedup keys before write them into the SST file.
// NOTE: keys should be sorted and deduped when construct one SST file.
lastKey []byte
logger log.Logger
}
func newSSTWriter(path string, blockSize int) (*sstable.Writer, error) {
f, err := vfs.Default.Create(path)
if err != nil {
return nil, errors.Trace(err)
}
// Logic to ensure the default block size is set to 16KB.
// If a smaller block size is used (e.g., 4KB, the default for Pebble),
// a single large SST file may generate a disproportionately large index block,
// potentially causing a memory spike and leading to an Out of Memory (OOM) scenario.
// If the user specifies a smaller block size, respect their choice.
if blockSize <= 0 {
blockSize = config.DefaultBlockSize
}
writable := objstorageprovider.NewFileWritable(f)
writer := sstable.NewWriter(writable, sstable.WriterOptions{
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
newRangePropertiesCollector,
},
BlockSize: blockSize,
})
return writer, nil
}
func (sw *sstWriter) writeKVs(kvs []common.KvPair) error {
if len(kvs) == 0 {
return nil
}
if len(sw.minKey) == 0 {
sw.minKey = slices.Clone(kvs[0].Key)
}
if bytes.Compare(kvs[0].Key, sw.maxKey) <= 0 {
return errorUnorderedSSTInsertion
}
internalKey := sstable.InternalKey{
Trailer: uint64(sstable.InternalKeyKindSet),
}
for _, p := range kvs {
if sw.lastKey != nil && bytes.Equal(p.Key, sw.lastKey) {
sw.logger.Warn("duplicated key found, skip write", logutil.Key("key", p.Key))
continue
}
internalKey.UserKey = p.Key
if err := sw.writer.Add(internalKey, p.Val); err != nil {
return errors.Trace(err)
}
sw.totalSize += int64(len(p.Key)) + int64(len(p.Val))
sw.lastKey = p.Key
}
sw.totalCount += int64(len(kvs))
sw.maxKey = append(sw.maxKey[:0], sw.lastKey...)
return nil
}
func (sw *sstWriter) close() (*sstMeta, error) {
if err := sw.writer.Close(); err != nil {
return nil, errors.Trace(err)
}
meta, err := sw.writer.Metadata()
if err != nil {
return nil, errors.Trace(err)
}
sw.fileSize = int64(meta.Size)
return sw.sstMeta, nil
}
type sstIter struct {
name string
key []byte
val []byte
iter sstable.Iterator
reader *sstable.Reader
valid bool
}
// Close implements common.Iterator.
func (i *sstIter) Close() error {
if err := i.iter.Close(); err != nil {
return errors.Trace(err)
}
err := i.reader.Close()
return errors.Trace(err)
}
type sstIterHeap struct {
iters []*sstIter
}
// Len implements heap.Interface.
func (h *sstIterHeap) Len() int {
return len(h.iters)
}
// Less implements heap.Interface.
func (h *sstIterHeap) Less(i, j int) bool {
return bytes.Compare(h.iters[i].key, h.iters[j].key) < 0
}
// Swap implements heap.Interface.
func (h *sstIterHeap) Swap(i, j int) {
h.iters[i], h.iters[j] = h.iters[j], h.iters[i]
}
// Push implements heap.Interface.
func (h *sstIterHeap) Push(x any) {
h.iters = append(h.iters, x.(*sstIter))
}
// Pop implements heap.Interface.
func (h *sstIterHeap) Pop() any {
item := h.iters[len(h.iters)-1]
h.iters = h.iters[:len(h.iters)-1]
return item
}
// Next implements common.Iterator.
func (h *sstIterHeap) Next() (key, val []byte, err error) {
for {
if len(h.iters) == 0 {
return nil, nil, nil
}
iter := h.iters[0]
if iter.valid {
iter.valid = false
return iter.key, iter.val, iter.iter.Error()
}
var k *pebble.InternalKey
var v pebble.LazyValue
k, v = iter.iter.Next()
if k != nil {
vBytes, _, err := v.Value(nil)
if err != nil {
return nil, nil, errors.Trace(err)
}
iter.key = k.UserKey
iter.val = vBytes
iter.valid = true
heap.Fix(h, 0)
} else {
err := iter.Close()
heap.Remove(h, 0)
if err != nil {
return nil, nil, errors.Trace(err)
}
}
}
}
// sstIngester is a interface used to merge and ingest SST files.
// it's a interface mainly used for test convenience
type sstIngester interface {
mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error)
ingest([]*sstMeta) error
}
type dbSSTIngester struct {
e *Engine
}
func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error) {
failpoint.InjectCall("beforeMergeSSTs")
failpoint.Inject("mockErrInMergeSSTs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mocked error in mergeSSTs"))
}
})
if len(metas) == 0 {
return nil, errors.New("sst metas is empty")
} else if len(metas) == 1 {
return metas[0], nil
}
start := time.Now()
newMeta := &sstMeta{
seq: metas[len(metas)-1].seq,
}
mergeIter := &sstIterHeap{
iters: make([]*sstIter, 0, len(metas)),
}
for _, p := range metas {
f, err := vfs.Default.Open(p.path)
if err != nil {
return nil, errors.Trace(err)
}
readable, err := sstable.NewSimpleReadable(f)
if err != nil {
return nil, errors.Trace(err)
}
reader, err := sstable.NewReader(readable, sstable.ReaderOptions{})
if err != nil {
return nil, errors.Trace(err)
}
iter, err := reader.NewIter(nil, nil)
if err != nil {
return nil, errors.Trace(err)
}
key, val := iter.Next()
if key == nil {
continue
}
valBytes, _, err := val.Value(nil)
if err != nil {
return nil, errors.Trace(err)
}
if iter.Error() != nil {
return nil, errors.Trace(iter.Error())
}
mergeIter.iters = append(mergeIter.iters, &sstIter{
name: p.path,
iter: iter,
key: key.UserKey,
val: valBytes,
reader: reader,
valid: true,
})
newMeta.totalSize += p.totalSize
newMeta.totalCount += p.totalCount
}
heap.Init(mergeIter)
name := filepath.Join(dir, fmt.Sprintf("%s.sst", uuid.New()))
writer, err := newSSTWriter(name, blockSize)
if err != nil {
return nil, errors.Trace(err)
}
newMeta.path = name
internalKey := sstable.InternalKey{
Trailer: uint64(sstable.InternalKeyKindSet),
}
key, val, err := mergeIter.Next()
if err != nil {
return nil, err
}
if key == nil {
return nil, errors.New("all ssts are empty")
}
newMeta.minKey = append(newMeta.minKey[:0], key...)
lastKey := make([]byte, 0)
for {
if bytes.Equal(lastKey, key) {
i.e.logger.Warn("duplicated key found, skipped", zap.Binary("key", lastKey))
newMeta.totalCount--
newMeta.totalSize -= int64(len(key) + len(val))
goto nextKey
}
internalKey.UserKey = key
err = writer.Add(internalKey, val)
if err != nil {
return nil, err
}
lastKey = append(lastKey[:0], key...)
nextKey:
key, val, err = mergeIter.Next()
if err != nil {
return nil, err
}
if key == nil {
break
}
}
err = writer.Close()
if err != nil {
return nil, errors.Trace(err)
}
meta, err := writer.Metadata()
if err != nil {
return nil, errors.Trace(err)
}
newMeta.maxKey = lastKey
newMeta.fileSize = int64(meta.Size)
dur := time.Since(start)
i.e.logger.Info("compact sst", zap.Int("fileCount", len(metas)), zap.Int64("size", newMeta.totalSize),
zap.Int64("count", newMeta.totalCount), zap.Duration("cost", dur), zap.String("file", name))
// async clean raw SSTs.
go func() {
totalSize := int64(0)
for _, m := range metas {
totalSize += m.fileSize
if err := os.Remove(m.path); err != nil {
i.e.logger.Warn("async cleanup sst file failed", zap.Error(err))
}
}
// decrease the pending size after clean up
i.e.pendingFileSize.Sub(totalSize)
}()
return newMeta, err
}
func (i dbSSTIngester) ingest(metas []*sstMeta) error {
if len(metas) == 0 {
return nil
}
paths := make([]string, 0, len(metas))
for _, m := range metas {
paths = append(paths, m.path)
}
db := i.e.getDB()
if db == nil {
return errorEngineClosed
}
return db.Ingest(paths)
}