896 lines
27 KiB
Go
896 lines
27 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"path/filepath"
|
|
"slices"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/docker/go-units"
|
|
"github.com/jfcg/sorty/v2"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/membuf"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/objstore/objectio"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// writeStepMemShareCount defines the number of shares of memory per job worker.
|
|
// For each job worker, the memory it can use is determined by cpu:mem ratio, say
|
|
// a 16c32G machine, each worker can use 2G memory.
|
|
// And for the memory corresponding to each job worker, we divide into below and
|
|
// total 6.5 shares:
|
|
// - one share used by HTTP and GRPC buf, such as loadRangeBatchData, write TiKV
|
|
// - one share used by loadRangeBatchData to store loaded data batch A
|
|
// - one share used by generateAndSendJob for handle loaded data batch B
|
|
// - one share used by the active job on job worker
|
|
// - 2.5 share for others, and burst allocation to avoid OOM
|
|
//
|
|
// the share size 'SS' determines the max data size 'RangeS' for a split-range
|
|
// which is split out by RangeSplitter.
|
|
// split-range is intersected with region to generate range job which is handled
|
|
// by range job worker, and each range job corresponding to one ingested SST on TiKV.
|
|
// our goal here is to load as many data as possible to make all range job workers
|
|
// fully parallelized, while minimizing the number of SSTs (too many SST file, say
|
|
// 500K, will cause TiKV slow down when ingest), i.e. to make RangeS larger, and
|
|
// also try to make the SST be more even, so we calculate RangeS by:
|
|
// - RS = region size
|
|
// - let TempRangeS = SS
|
|
// - if TempRangeS < RS, RangeS = RS / ceil(RS / TempRangeS) + 1,
|
|
// trailing 1 is for RS divided by odd number.
|
|
// - else RangeS = floor(TempRangeS / RS) * RS.
|
|
//
|
|
// Note: below calculation only consider the memory taken by the KV pair itself,
|
|
// golang takes 24*2 = 48B for each KV pair, so if the size of KV pair itself is
|
|
// very small, the real memory taken by each KV pair might be doubled, so it's
|
|
// only an estimation.
|
|
// such as, for a simple table "create table t(id bigint primary key, v bigint, index(v))",
|
|
// each index KV is 38B, golang need 86B memory to store it.
|
|
//
|
|
// RangeS for different region size and cpu:mem ratio, the number in parentheses
|
|
// is the number of SST files per region:
|
|
//
|
|
// | RS | RangeS | RangeS |
|
|
// | | cpu:mem=1:1.7 | cpu:mem=1:3.5 |
|
|
// |-------|---------------|---------------|
|
|
// | 96M | 192M(1) | 480M(1) |
|
|
// | 256M | 256M(1) | 512M(1) |
|
|
// | 512M | 256M(2) | 512M(1) |
|
|
const writeStepMemShareCount = 6.5
|
|
|
|
// getEngineMemoryLimit calculates the memory limit for external engine according
|
|
// to the memory capacity.
|
|
func getEngineMemoryLimit(memCapacity int64) int {
|
|
// at most 3 batches can be loaded in memory, see writeStepMemShareCount.
|
|
return int(float64(memCapacity) / writeStepMemShareCount * 3)
|
|
}
|
|
|
|
type memKVsAndBuffers struct {
|
|
mu sync.Mutex
|
|
kvs []KVPair
|
|
// memKVBuffers contains two types of buffer, first half are used for small block
|
|
// buffer, second half are used for large one.
|
|
memKVBuffers []*membuf.Buffer
|
|
size int
|
|
droppedSize int
|
|
|
|
// temporary fields to store KVs to reduce slice allocations.
|
|
kvsPerFile [][]KVPair
|
|
droppedSizePerFile []int
|
|
}
|
|
|
|
func (b *memKVsAndBuffers) build(ctx context.Context) {
|
|
sumKVCnt := 0
|
|
for _, keys := range b.kvsPerFile {
|
|
sumKVCnt += len(keys)
|
|
}
|
|
b.droppedSize = 0
|
|
for _, size := range b.droppedSizePerFile {
|
|
b.droppedSize += size
|
|
}
|
|
b.droppedSizePerFile = nil
|
|
|
|
logutil.Logger(ctx).Info("building memKVsAndBuffers",
|
|
zap.Int("sumKVCnt", sumKVCnt),
|
|
zap.Int("droppedSize", b.droppedSize))
|
|
|
|
b.kvs = make([]KVPair, 0, sumKVCnt)
|
|
for i := range b.kvsPerFile {
|
|
b.kvs = append(b.kvs, b.kvsPerFile[i]...)
|
|
b.kvsPerFile[i] = nil
|
|
}
|
|
b.kvsPerFile = nil
|
|
}
|
|
|
|
// Engine stored sorted key/value pairs in an external storage.
|
|
type Engine struct {
|
|
storage storeapi.Storage
|
|
dataFiles []string
|
|
statsFiles []string
|
|
startKey []byte
|
|
endKey []byte
|
|
jobKeys [][]byte
|
|
splitKeys [][]byte
|
|
smallBlockBufPool *membuf.Pool
|
|
largeBlockBufPool *membuf.Pool
|
|
|
|
memKVsAndBuffers memKVsAndBuffers
|
|
// totalLoadedKVsCount accumulates the total number of KVs loaded in LoadIngestData
|
|
totalLoadedKVsCount atomic.Int64
|
|
|
|
// activeIngestDataFlags store flags generated by this engine
|
|
activeIngestDataFlags []*atomic.Bool
|
|
|
|
// region job workerPool pool related to this engine.
|
|
workerPool atomic.Pointer[workerpool.Tuner]
|
|
|
|
// checkHotspot is true means we will check hotspot file when using MergeKVIter.
|
|
// if hotspot file is detected, we will use multiple readers to read data.
|
|
// if it's false, MergeKVIter will read each file using 1 reader.
|
|
// this flag also affects the strategy of loading data, either:
|
|
// less load routine + check and read hotspot file concurrently (add-index uses this one)
|
|
// more load routine + read each file using 1 reader (import-into uses this one)
|
|
checkHotspot bool
|
|
|
|
workerConcurrency atomic.Int32
|
|
|
|
// readyCh is used to notify that the engine's resource has been updated
|
|
readyCh chan struct{}
|
|
|
|
ts uint64
|
|
|
|
totalKVSize int64
|
|
totalKVCount int64
|
|
|
|
importedKVSize *atomic.Int64
|
|
importedKVCount *atomic.Int64
|
|
memLimit int
|
|
onDup engineapi.OnDuplicateKey
|
|
filePrefix string
|
|
// below fields are only used when onDup is OnDuplicateKeyRecord.
|
|
recordedDupCnt int
|
|
recordedDupSize int64
|
|
dupFile string
|
|
dupWriter objectio.Writer
|
|
dupKVStore *KeyValueStore
|
|
}
|
|
|
|
var _ engineapi.Engine = (*Engine)(nil)
|
|
|
|
const (
|
|
smallBlockSize = units.MiB
|
|
)
|
|
|
|
// NewExternalEngine creates an (external) engine.
|
|
func NewExternalEngine(
|
|
ctx context.Context,
|
|
storage storeapi.Storage,
|
|
dataFiles []string,
|
|
statsFiles []string,
|
|
startKey []byte,
|
|
endKey []byte,
|
|
jobKeys [][]byte,
|
|
splitKeys [][]byte,
|
|
workerConcurrency int,
|
|
ts uint64,
|
|
totalKVSize int64,
|
|
totalKVCount int64,
|
|
checkHotspot bool,
|
|
memCapacity int64,
|
|
onDup engineapi.OnDuplicateKey,
|
|
filePrefix string,
|
|
) *Engine {
|
|
memLimit := getEngineMemoryLimit(memCapacity)
|
|
logutil.Logger(ctx).Info("create external engine",
|
|
zap.String("memLimitForLoadRange", units.BytesSize(float64(memLimit))),
|
|
zap.Int("dataFileCount", len(dataFiles)),
|
|
zap.Int("jobKeysCount", len(jobKeys)),
|
|
zap.Int("splitKeysCount", len(splitKeys)),
|
|
)
|
|
memLimiter := membuf.NewLimiter(memLimit)
|
|
return &Engine{
|
|
storage: storage,
|
|
dataFiles: dataFiles,
|
|
statsFiles: statsFiles,
|
|
startKey: startKey,
|
|
endKey: endKey,
|
|
jobKeys: jobKeys,
|
|
splitKeys: splitKeys,
|
|
smallBlockBufPool: membuf.NewPool(
|
|
membuf.WithBlockNum(0),
|
|
membuf.WithPoolMemoryLimiter(memLimiter),
|
|
membuf.WithBlockSize(smallBlockSize),
|
|
),
|
|
largeBlockBufPool: membuf.NewPool(
|
|
membuf.WithBlockNum(0),
|
|
membuf.WithPoolMemoryLimiter(memLimiter),
|
|
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
|
|
),
|
|
checkHotspot: checkHotspot,
|
|
workerConcurrency: *atomic.NewInt32(int32(workerConcurrency)),
|
|
readyCh: make(chan struct{}),
|
|
ts: ts,
|
|
totalKVSize: totalKVSize,
|
|
totalKVCount: totalKVCount,
|
|
importedKVSize: atomic.NewInt64(0),
|
|
importedKVCount: atomic.NewInt64(0),
|
|
memLimit: memLimit,
|
|
onDup: onDup,
|
|
filePrefix: filePrefix,
|
|
}
|
|
}
|
|
|
|
func getFilesReadConcurrency(
|
|
ctx context.Context,
|
|
storage storeapi.Storage,
|
|
statsFiles []string,
|
|
startKey, endKey []byte,
|
|
) ([]uint64, []uint64, error) {
|
|
result := make([]uint64, len(statsFiles))
|
|
offsets, err := seekPropsOffsets(ctx, []kv.Key{startKey, endKey}, statsFiles, storage)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
startOffs, endOffs := offsets[0], offsets[1]
|
|
totalFileSize := uint64(0)
|
|
for i := range statsFiles {
|
|
size := endOffs[i] - startOffs[i]
|
|
totalFileSize += size
|
|
expectedConc := size / uint64(ConcurrentReaderBufferSizePerConc)
|
|
// let the stat internals cover the [startKey, endKey) since seekPropsOffsets
|
|
// always return an offset that is less than or equal to the key.
|
|
expectedConc += 1
|
|
// readAllData will enable concurrent read and use large buffer if result[i] > 1
|
|
// when expectedConc < readAllDataConcThreshold, we don't use concurrent read to
|
|
// reduce overhead
|
|
if expectedConc >= readAllDataConcThreshold {
|
|
result[i] = expectedConc
|
|
} else {
|
|
result[i] = 1
|
|
}
|
|
// only log for files with expected concurrency > 1, to avoid too many logs
|
|
if expectedConc > 1 {
|
|
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
|
|
zap.String("filename", statsFiles[i]),
|
|
zap.Uint64("startOffset", startOffs[i]),
|
|
zap.Uint64("endOffset", endOffs[i]),
|
|
zap.Uint64("expectedConc", expectedConc),
|
|
zap.Uint64("concurrency", result[i]),
|
|
)
|
|
}
|
|
}
|
|
// Note: this is the file size of the range group, KV size is smaller, as we
|
|
// need additional 8*2 for each KV.
|
|
logutil.Logger(ctx).Info("estimated file size of this range group",
|
|
zap.String("totalSize", units.BytesSize(float64(totalFileSize))))
|
|
return result, startOffs, nil
|
|
}
|
|
|
|
func (e *Engine) loadRangeBatchData(ctx context.Context, jobKeys [][]byte, outCh chan<- engineapi.DataAndRanges) error {
|
|
readAndSortRateHist := metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort")
|
|
readAndSortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort")
|
|
readRateHist := metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read")
|
|
readDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read")
|
|
sortRateHist := metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("sort")
|
|
sortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("sort")
|
|
|
|
failpoint.Inject("mockLoadBatchRegionData", func(_ failpoint.Value) {
|
|
kvs := make([]KVPair, 0)
|
|
kvs = append(kvs, KVPair{[]byte{}, []byte{}})
|
|
data := e.buildIngestData(kvs, nil)
|
|
data.IncRef()
|
|
select {
|
|
case <-ctx.Done():
|
|
failpoint.Return(ctx.Err())
|
|
case outCh <- engineapi.DataAndRanges{
|
|
Data: data,
|
|
}:
|
|
e.activeIngestDataFlags = append(e.activeIngestDataFlags, data.released)
|
|
failpoint.Return(nil)
|
|
}
|
|
})
|
|
|
|
startKey := jobKeys[0]
|
|
endKey := jobKeys[len(jobKeys)-1]
|
|
readStart := time.Now()
|
|
// read all data in range [startKey, endKey)
|
|
err := readAllData(
|
|
ctx,
|
|
e.storage,
|
|
e.dataFiles,
|
|
e.statsFiles,
|
|
startKey,
|
|
endKey,
|
|
e.smallBlockBufPool,
|
|
e.largeBlockBufPool,
|
|
&e.memKVsAndBuffers,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.memKVsAndBuffers.build(ctx)
|
|
|
|
readDur := time.Since(readStart)
|
|
readSecond := readDur.Seconds()
|
|
readDurHist.Observe(readSecond)
|
|
|
|
sortStart := time.Now()
|
|
oldSortyGor := sorty.MaxGor
|
|
sorty.MaxGor = uint64(e.workerConcurrency.Load() * 2)
|
|
var dupKey, dupVal atomic.Pointer[[]byte]
|
|
sorty.Sort(len(e.memKVsAndBuffers.kvs), func(i, k, r, s int) bool {
|
|
cmp := bytes.Compare(e.memKVsAndBuffers.kvs[i].Key, e.memKVsAndBuffers.kvs[k].Key)
|
|
if cmp < 0 { // strict comparator like < or >
|
|
if r != s {
|
|
e.memKVsAndBuffers.kvs[r], e.memKVsAndBuffers.kvs[s] = e.memKVsAndBuffers.kvs[s], e.memKVsAndBuffers.kvs[r]
|
|
}
|
|
return true
|
|
}
|
|
if cmp == 0 && i != k {
|
|
if dupKey.Load() == nil {
|
|
key := slices.Clone(e.memKVsAndBuffers.kvs[i].Key)
|
|
dupKey.Store(&key)
|
|
value := slices.Clone(e.memKVsAndBuffers.kvs[i].Value)
|
|
dupVal.Store(&value)
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
sorty.MaxGor = oldSortyGor
|
|
sortDur := time.Since(sortStart)
|
|
sortSecond := sortDur.Seconds()
|
|
sortDurHist.Observe(sortSecond)
|
|
|
|
// we shouldn't handle duplicates for OnDuplicateKeyIgnore, it's the semantic
|
|
// of OnDuplicateKeyError, but to make keep compatible with the old code, we
|
|
// keep this behavior.
|
|
// TODO: remove this when we have have fully integrated the OnDuplicateKey.
|
|
if k, v := dupKey.Load(), dupVal.Load(); k != nil {
|
|
if e.onDup == engineapi.OnDuplicateKeyIgnore {
|
|
return errors.Errorf("duplicate key found: %s", hex.EncodeToString(*k))
|
|
} else if e.onDup == engineapi.OnDuplicateKeyError {
|
|
return common.ErrFoundDuplicateKeys.FastGenByArgs(*k, *v)
|
|
}
|
|
}
|
|
readAndSortSecond := time.Since(readStart).Seconds()
|
|
readAndSortDurHist.Observe(readAndSortSecond)
|
|
|
|
size := e.memKVsAndBuffers.size
|
|
readAndSortRateHist.Observe(float64(size) / 1024.0 / 1024.0 / readAndSortSecond)
|
|
readRateHist.Observe(float64(size) / 1024.0 / 1024.0 / readSecond)
|
|
sortRateHist.Observe(float64(size) / 1024.0 / 1024.0 / sortSecond)
|
|
|
|
var (
|
|
deduplicatedKVs, dups []KVPair
|
|
dupCount int
|
|
deduplicateDur time.Duration
|
|
)
|
|
deduplicatedKVs = e.memKVsAndBuffers.kvs
|
|
dupFound := dupKey.Load() != nil
|
|
if dupFound {
|
|
start := time.Now()
|
|
if e.onDup == engineapi.OnDuplicateKeyRecord {
|
|
if err = e.lazyInitDupWriter(ctx); err != nil {
|
|
return err
|
|
}
|
|
deduplicatedKVs, dups, dupCount = removeDuplicates(deduplicatedKVs, getPairKey, true)
|
|
e.recordedDupCnt += len(dups)
|
|
for _, p := range dups {
|
|
e.recordedDupSize += int64(len(p.Key) + len(p.Value))
|
|
if err = e.dupKVStore.addRawKV(p.Key, p.Value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else if e.onDup == engineapi.OnDuplicateKeyRemove {
|
|
deduplicatedKVs, _, dupCount = removeDuplicates(deduplicatedKVs, getPairKey, false)
|
|
}
|
|
deduplicateDur = time.Since(start)
|
|
}
|
|
logutil.Logger(ctx).Info("load range batch done",
|
|
zap.Duration("readDur", readDur),
|
|
zap.Duration("sortDur", sortDur),
|
|
zap.Int("droppedSize", e.memKVsAndBuffers.droppedSize),
|
|
zap.Int("loadedKVs", len(e.memKVsAndBuffers.kvs)),
|
|
zap.String("loadedSize", units.HumanSize(float64(e.memKVsAndBuffers.size))),
|
|
zap.Stringer("onDup", e.onDup),
|
|
zap.Int("dupCount", dupCount),
|
|
zap.Int("recordedDupCount", len(dups)),
|
|
zap.Int("totalRecordedDupCount", e.recordedDupCnt),
|
|
zap.String("totalRecordedDupSize", units.BytesSize(float64(e.recordedDupSize))),
|
|
zap.Duration("deduplicateDur", deduplicateDur),
|
|
)
|
|
|
|
data := e.buildIngestData(
|
|
deduplicatedKVs,
|
|
e.memKVsAndBuffers.memKVBuffers,
|
|
)
|
|
|
|
// accumulate the total number of KVs loaded in LoadIngestData
|
|
e.totalLoadedKVsCount.Add(int64(len(deduplicatedKVs)))
|
|
|
|
// release the reference of e.memKVsAndBuffers
|
|
e.memKVsAndBuffers.kvs = nil
|
|
e.memKVsAndBuffers.memKVBuffers = nil
|
|
e.memKVsAndBuffers.size = 0
|
|
|
|
ranges := make([]engineapi.Range, 0, len(jobKeys)-1)
|
|
prev := slices.Clone(jobKeys[0])
|
|
for i := 1; i < len(jobKeys)-1; i++ {
|
|
cur := slices.Clone(jobKeys[i])
|
|
ranges = append(ranges, engineapi.Range{
|
|
Start: prev,
|
|
End: cur,
|
|
})
|
|
prev = cur
|
|
}
|
|
lastKey := slices.Clone(jobKeys[len(jobKeys)-1])
|
|
ranges = append(ranges, engineapi.Range{
|
|
Start: prev,
|
|
End: lastKey,
|
|
})
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case outCh <- engineapi.DataAndRanges{
|
|
Data: data,
|
|
SortedRanges: ranges,
|
|
}:
|
|
// We can't rely on refCnt to check if the data is released, because
|
|
// the refCnt is initialized to zero when the data is generated.
|
|
// So we use the released field to check if the data is released.
|
|
// If the data is discarded without calling Ref(), it means we meet
|
|
// an error in LoadIngestData, and the input context will be canceled.
|
|
e.activeIngestDataFlags = append(e.activeIngestDataFlags, data.released)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateActiveIngestDataFlags updates ingest data flags to remove released ones.
|
|
func (e *Engine) updateActiveIngestDataFlags() {
|
|
res := e.activeIngestDataFlags[:0]
|
|
for _, r := range e.activeIngestDataFlags {
|
|
if !r.Load() {
|
|
res = append(res, r)
|
|
}
|
|
}
|
|
e.activeIngestDataFlags = res
|
|
}
|
|
|
|
// handleConcurrencyChange handles the concurrency change for this engine. If
|
|
// change is detected, we need to wait for all previous data being consumed, then
|
|
// recreate a new memory pool to this engine. And it will returns the current batch size
|
|
func (e *Engine) handleConcurrencyChange(ctx context.Context, currBatchSize int) int {
|
|
newBatchSize := int(e.workerConcurrency.Load())
|
|
failpoint.Inject("LoadIngestDataBatchSize", func(val failpoint.Value) {
|
|
currBatchSize = val.(int)
|
|
newBatchSize = currBatchSize
|
|
})
|
|
|
|
if newBatchSize == currBatchSize {
|
|
e.updateActiveIngestDataFlags()
|
|
return currBatchSize
|
|
}
|
|
|
|
logger := logutil.Logger(ctx).With(
|
|
zap.Int("prev batch size", currBatchSize),
|
|
zap.Int("new batch size", newBatchSize))
|
|
|
|
startTime := time.Now()
|
|
logger.Info("waiting ingest data batch size change")
|
|
|
|
tick := time.NewTicker(time.Second)
|
|
defer func() {
|
|
tick.Stop()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return currBatchSize
|
|
case <-tick.C:
|
|
e.updateActiveIngestDataFlags()
|
|
}
|
|
if len(e.activeIngestDataFlags) == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Now we can safely reset the memory pool.
|
|
e.Reset()
|
|
|
|
logger.Info("load ingest data batch size changed", zap.Duration("used time", time.Since(startTime)))
|
|
|
|
// Notify that we have changed the resource usage
|
|
select {
|
|
case <-ctx.Done():
|
|
return currBatchSize
|
|
case e.readyCh <- struct{}{}:
|
|
}
|
|
|
|
return newBatchSize
|
|
}
|
|
|
|
// LoadIngestData loads the data from the external storage to memory in [start,
|
|
// end) range, so local backend can ingest it. The used byte slice of ingest data
|
|
// are allocated from Engine.bufPool and must be released by
|
|
// MemoryIngestData.DecRef().
|
|
func (e *Engine) LoadIngestData(
|
|
ctx context.Context,
|
|
outCh chan<- engineapi.DataAndRanges,
|
|
) (err error) {
|
|
if e.onDup == engineapi.OnDuplicateKeyRecord {
|
|
defer func() {
|
|
err1 := e.closeDupWriterAsNeeded(ctx)
|
|
if err == nil {
|
|
err = err1
|
|
}
|
|
}()
|
|
}
|
|
// try to make every worker busy for each batch
|
|
currBatchSize := int(e.workerConcurrency.Load())
|
|
logutil.Logger(ctx).Info("load ingest data", zap.Int("current batchSize", currBatchSize))
|
|
|
|
for start := 0; start < len(e.jobKeys)-1; {
|
|
currBatchSize = e.handleConcurrencyChange(ctx, currBatchSize)
|
|
// want to generate N ranges, so we need N+1 keys
|
|
end := min(1+start+currBatchSize, len(e.jobKeys))
|
|
err = e.loadRangeBatchData(ctx, e.jobKeys[start:end], outCh)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
start += currBatchSize
|
|
}
|
|
|
|
// All data has been generated, no need to change the concurrency
|
|
close(e.readyCh)
|
|
return nil
|
|
}
|
|
|
|
// lazyInitDupWriter lazily initializes the duplicate writer.
|
|
// we need test on KS3 which will report InvalidArgument if the file is empty.
|
|
// GCS is ok with empty file.
|
|
func (e *Engine) lazyInitDupWriter(ctx context.Context) error {
|
|
if e.dupWriter != nil {
|
|
return nil
|
|
}
|
|
dupFile := filepath.Join(e.filePrefix, "dup")
|
|
dupWriter, err := e.storage.Create(ctx, dupFile, &storeapi.WriterOption{
|
|
// TODO might need to tune concurrency.
|
|
// max 150GiB duplicates can be saved, it should be enough, as we split
|
|
// subtask into 100G each.
|
|
Concurrency: 1,
|
|
PartSize: 3 * MinUploadPartSize})
|
|
if err != nil {
|
|
logutil.Logger(ctx).Error("create dup writer failed", zap.Error(err))
|
|
return err
|
|
}
|
|
e.dupFile = dupFile
|
|
e.dupWriter = dupWriter
|
|
e.dupKVStore = NewKeyValueStore(ctx, e.dupWriter, nil)
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) closeDupWriterAsNeeded(ctx context.Context) error {
|
|
if e.dupWriter == nil {
|
|
return nil
|
|
}
|
|
kvStore, writer := e.dupKVStore, e.dupWriter
|
|
e.dupKVStore, e.dupWriter = nil, nil
|
|
kvStore.finish()
|
|
if err := writer.Close(ctx); err != nil {
|
|
logutil.Logger(ctx).Error("close dup writer failed", zap.Error(err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) buildIngestData(kvs []KVPair, buf []*membuf.Buffer) *MemoryIngestData {
|
|
return &MemoryIngestData{
|
|
kvs: kvs,
|
|
ts: e.ts,
|
|
memBuf: buf,
|
|
released: atomic.NewBool(false),
|
|
refCnt: atomic.NewInt64(0),
|
|
importedKVSize: e.importedKVSize,
|
|
importedKVCount: e.importedKVCount,
|
|
}
|
|
}
|
|
|
|
// SetWorkerPool sets the worker pool for this engine.
|
|
func (e *Engine) SetWorkerPool(worker workerpool.Tuner) {
|
|
e.workerPool.Store(&worker)
|
|
}
|
|
|
|
// UpdateResource changes the concurrency and the memory pool size of this engine.
|
|
func (e *Engine) UpdateResource(ctx context.Context, concurrency int, memCapacity int64) error {
|
|
worker := *e.workerPool.Load()
|
|
if worker == nil {
|
|
return errors.New("region job worker is not initialized, retry later")
|
|
}
|
|
|
|
if e.workerConcurrency.Load() == int32(concurrency) {
|
|
return nil
|
|
}
|
|
|
|
// Update memLimit first, then update concurrency and wait.
|
|
e.memLimit = getEngineMemoryLimit(memCapacity)
|
|
e.workerConcurrency.Store(int32(concurrency))
|
|
|
|
failpoint.InjectCall("afterUpdateWorkerConcurrency")
|
|
|
|
// Wait for current data being consumed and new memory pool created.
|
|
select {
|
|
case <-ctx.Done():
|
|
logutil.Logger(ctx).Info("context done when updating resource, skip update resource")
|
|
return ctx.Err()
|
|
case _, ok := <-e.readyCh:
|
|
if ok {
|
|
logutil.Logger(ctx).Info("update resource for external engine",
|
|
zap.Int("concurrency", concurrency),
|
|
zap.String("memLimit", units.BytesSize(float64(e.memLimit))))
|
|
worker.Tune(int32(concurrency), true)
|
|
} else {
|
|
logutil.Logger(ctx).Info("load data finished, skip update resource")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// KVStatistics returns the total kv size and total kv count.
|
|
func (e *Engine) KVStatistics() (totalKVSize int64, totalKVCount int64) {
|
|
return e.totalKVSize, e.totalKVCount
|
|
}
|
|
|
|
// ImportedStatistics returns the imported kv size and imported kv count.
|
|
func (e *Engine) ImportedStatistics() (importedSize int64, importedKVCount int64) {
|
|
return e.importedKVSize.Load(), e.importedKVCount.Load()
|
|
}
|
|
|
|
// GetTotalLoadedKVsCount returns the total number of KVs loaded in LoadIngestData.
|
|
func (e *Engine) GetTotalLoadedKVsCount() int64 {
|
|
return e.totalLoadedKVsCount.Load()
|
|
}
|
|
|
|
// ConflictInfo implements common.Engine.
|
|
func (e *Engine) ConflictInfo() engineapi.ConflictInfo {
|
|
if e.recordedDupCnt == 0 {
|
|
return engineapi.ConflictInfo{}
|
|
}
|
|
return engineapi.ConflictInfo{
|
|
Count: uint64(e.recordedDupCnt),
|
|
Files: []string{e.dupFile},
|
|
}
|
|
}
|
|
|
|
// ID is the identifier of an engine.
|
|
func (e *Engine) ID() string {
|
|
return "external"
|
|
}
|
|
|
|
// GetOnDup returns the OnDuplicateKey action for this engine.
|
|
func (e *Engine) GetOnDup() engineapi.OnDuplicateKey {
|
|
return e.onDup
|
|
}
|
|
|
|
// GetKeyRange implements common.Engine.
|
|
func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
|
|
return e.startKey, e.endKey, nil
|
|
}
|
|
|
|
// GetRegionSplitKeys implements common.Engine.
|
|
func (e *Engine) GetRegionSplitKeys() ([][]byte, error) {
|
|
splitKeys := make([][]byte, 0, len(e.splitKeys))
|
|
for _, k := range e.splitKeys {
|
|
splitKeys = append(splitKeys, slices.Clone(k))
|
|
}
|
|
return splitKeys, nil
|
|
}
|
|
|
|
// Close implements common.Engine.
|
|
func (e *Engine) Close() error {
|
|
if e.smallBlockBufPool != nil {
|
|
e.smallBlockBufPool.Destroy()
|
|
e.smallBlockBufPool = nil
|
|
}
|
|
if e.largeBlockBufPool != nil {
|
|
e.largeBlockBufPool.Destroy()
|
|
e.largeBlockBufPool = nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Reset resets the memory buffer pool.
|
|
func (e *Engine) Reset() {
|
|
memLimiter := membuf.NewLimiter(e.memLimit)
|
|
if e.smallBlockBufPool != nil {
|
|
e.smallBlockBufPool.Destroy()
|
|
e.smallBlockBufPool = membuf.NewPool(
|
|
membuf.WithBlockNum(0),
|
|
membuf.WithPoolMemoryLimiter(memLimiter),
|
|
membuf.WithBlockSize(smallBlockSize),
|
|
)
|
|
}
|
|
if e.largeBlockBufPool != nil {
|
|
e.largeBlockBufPool.Destroy()
|
|
e.largeBlockBufPool = membuf.NewPool(
|
|
membuf.WithBlockNum(0),
|
|
membuf.WithPoolMemoryLimiter(memLimiter),
|
|
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
|
|
)
|
|
}
|
|
}
|
|
|
|
// MemoryIngestData is the in-memory implementation of IngestData.
|
|
type MemoryIngestData struct {
|
|
kvs []KVPair
|
|
ts uint64
|
|
|
|
memBuf []*membuf.Buffer
|
|
released *atomic.Bool
|
|
refCnt *atomic.Int64
|
|
importedKVSize *atomic.Int64
|
|
importedKVCount *atomic.Int64
|
|
}
|
|
|
|
var _ engineapi.IngestData = (*MemoryIngestData)(nil)
|
|
|
|
func (m *MemoryIngestData) firstAndLastKeyIndex(lowerBound, upperBound []byte) (int, int) {
|
|
firstKeyIdx := 0
|
|
if len(lowerBound) > 0 {
|
|
firstKeyIdx = sort.Search(len(m.kvs), func(i int) bool {
|
|
return bytes.Compare(lowerBound, m.kvs[i].Key) <= 0
|
|
})
|
|
if firstKeyIdx == len(m.kvs) {
|
|
return -1, -1
|
|
}
|
|
}
|
|
|
|
lastKeyIdx := len(m.kvs) - 1
|
|
if len(upperBound) > 0 {
|
|
i := sort.Search(len(m.kvs), func(i int) bool {
|
|
reverseIdx := len(m.kvs) - 1 - i
|
|
return bytes.Compare(upperBound, m.kvs[reverseIdx].Key) > 0
|
|
})
|
|
if i == len(m.kvs) {
|
|
// should not happen
|
|
return -1, -1
|
|
}
|
|
lastKeyIdx = len(m.kvs) - 1 - i
|
|
}
|
|
return firstKeyIdx, lastKeyIdx
|
|
}
|
|
|
|
// GetFirstAndLastKey implements IngestData.GetFirstAndLastKey.
|
|
func (m *MemoryIngestData) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
|
|
firstKeyIdx, lastKeyIdx := m.firstAndLastKeyIndex(lowerBound, upperBound)
|
|
if firstKeyIdx < 0 || firstKeyIdx > lastKeyIdx {
|
|
return nil, nil, nil
|
|
}
|
|
firstKey := slices.Clone(m.kvs[firstKeyIdx].Key)
|
|
lastKey := slices.Clone(m.kvs[lastKeyIdx].Key)
|
|
return firstKey, lastKey, nil
|
|
}
|
|
|
|
type memoryDataIter struct {
|
|
kvs []KVPair
|
|
|
|
firstKeyIdx int
|
|
lastKeyIdx int
|
|
curIdx int
|
|
}
|
|
|
|
// First implements ForwardIter.
|
|
func (m *memoryDataIter) First() bool {
|
|
if m.firstKeyIdx < 0 {
|
|
return false
|
|
}
|
|
m.curIdx = m.firstKeyIdx
|
|
return true
|
|
}
|
|
|
|
// Valid implements ForwardIter.
|
|
func (m *memoryDataIter) Valid() bool {
|
|
return m.firstKeyIdx <= m.curIdx && m.curIdx <= m.lastKeyIdx
|
|
}
|
|
|
|
// Next implements ForwardIter.
|
|
func (m *memoryDataIter) Next() bool {
|
|
m.curIdx++
|
|
return m.Valid()
|
|
}
|
|
|
|
// Key implements ForwardIter.
|
|
func (m *memoryDataIter) Key() []byte {
|
|
return m.kvs[m.curIdx].Key
|
|
}
|
|
|
|
// Value implements ForwardIter.
|
|
func (m *memoryDataIter) Value() []byte {
|
|
return m.kvs[m.curIdx].Value
|
|
}
|
|
|
|
// Close implements ForwardIter.
|
|
func (m *memoryDataIter) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// Error implements ForwardIter.
|
|
func (m *memoryDataIter) Error() error {
|
|
return nil
|
|
}
|
|
|
|
// ReleaseBuf implements ForwardIter.
|
|
func (m *memoryDataIter) ReleaseBuf() {}
|
|
|
|
// NewIter implements IngestData.NewIter.
|
|
func (m *MemoryIngestData) NewIter(
|
|
ctx context.Context,
|
|
lowerBound, upperBound []byte,
|
|
bufPool *membuf.Pool,
|
|
) engineapi.ForwardIter {
|
|
firstKeyIdx, lastKeyIdx := m.firstAndLastKeyIndex(lowerBound, upperBound)
|
|
iter := &memoryDataIter{
|
|
kvs: m.kvs,
|
|
firstKeyIdx: firstKeyIdx,
|
|
lastKeyIdx: lastKeyIdx,
|
|
}
|
|
return iter
|
|
}
|
|
|
|
// GetTS implements IngestData.GetTS.
|
|
func (m *MemoryIngestData) GetTS() uint64 {
|
|
return m.ts
|
|
}
|
|
|
|
// IncRef implements IngestData.IncRef.
|
|
func (m *MemoryIngestData) IncRef() {
|
|
m.refCnt.Inc()
|
|
// Make sure data is not released.
|
|
intest.Assert(!m.released.Load(), "data shouldn't be released when IncRef")
|
|
}
|
|
|
|
// DecRef implements IngestData.DecRef.
|
|
func (m *MemoryIngestData) DecRef() {
|
|
if m.refCnt.Dec() == 0 {
|
|
m.kvs = nil
|
|
for _, b := range m.memBuf {
|
|
b.Destroy()
|
|
}
|
|
m.released.Store(true)
|
|
}
|
|
}
|
|
|
|
// Finish implements IngestData.Finish.
|
|
func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) {
|
|
m.importedKVSize.Add(totalBytes)
|
|
m.importedKVCount.Add(totalCount)
|
|
}
|