231 lines
6.9 KiB
Go
231 lines
6.9 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ingest
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/google/uuid"
|
|
tidbkv "github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/backend"
|
|
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/util/generic"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Engine is the interface for the engine that can be used to write key-value pairs.
|
|
type Engine interface {
|
|
Flush() error
|
|
Close(cleanup bool)
|
|
CreateWriter(id int, writerCfg *backend.LocalWriterConfig) (Writer, error)
|
|
}
|
|
|
|
// Writer is the interface for the writer that can be used to write key-value pairs.
|
|
type Writer interface {
|
|
// WriteRow writes one row into downstream.
|
|
// To enable uniqueness check, the handle should be non-empty.
|
|
WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error
|
|
LockForWrite() (unlock func())
|
|
WrittenBytes() int64
|
|
}
|
|
|
|
// engineInfo is the engine for one index reorg task, each task will create several new writers under the
|
|
// Opened Engine. Note engineInfo is not thread safe.
|
|
type engineInfo struct {
|
|
ctx context.Context
|
|
jobID int64
|
|
indexID int64
|
|
unique bool
|
|
openedEngine *backend.OpenedEngine
|
|
|
|
uuid uuid.UUID
|
|
backend backend.Backend
|
|
writerCache generic.SyncMap[int, backend.EngineWriter]
|
|
memRoot MemRoot
|
|
flushLock *sync.RWMutex
|
|
}
|
|
|
|
// newEngineInfo create a new engineInfo struct.
|
|
func newEngineInfo(
|
|
ctx context.Context,
|
|
jobID, indexID int64,
|
|
unique bool,
|
|
en *backend.OpenedEngine,
|
|
uuid uuid.UUID,
|
|
bk backend.Backend,
|
|
memRoot MemRoot,
|
|
) *engineInfo {
|
|
return &engineInfo{
|
|
ctx: ctx,
|
|
jobID: jobID,
|
|
indexID: indexID,
|
|
unique: unique,
|
|
openedEngine: en,
|
|
uuid: uuid,
|
|
backend: bk,
|
|
writerCache: generic.NewSyncMap[int, backend.EngineWriter](4),
|
|
memRoot: memRoot,
|
|
flushLock: &sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
// Flush imports all the key-values in engine to the storage.
|
|
func (ei *engineInfo) Flush() error {
|
|
if ei.openedEngine == nil {
|
|
logutil.Logger(ei.ctx).Warn("engine is not open, skipping flush",
|
|
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
|
return nil
|
|
}
|
|
err := ei.openedEngine.Flush(ei.ctx)
|
|
if err != nil {
|
|
logutil.Logger(ei.ctx).Error(LitErrFlushEngineErr, zap.Error(err),
|
|
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close closes the engine and `cleanup` controls whether removes the local intermediate files.
|
|
func (ei *engineInfo) Close(cleanup bool) {
|
|
if ei.openedEngine == nil {
|
|
return
|
|
}
|
|
err := ei.closeWriters()
|
|
if err != nil {
|
|
logutil.Logger(ei.ctx).Warn(LitErrCloseWriterErr, zap.Error(err),
|
|
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
|
}
|
|
if cleanup {
|
|
defer func() {
|
|
err = ei.backend.CleanupEngine(ei.ctx, ei.uuid)
|
|
if err != nil {
|
|
logutil.Logger(ei.ctx).Warn(LitErrCleanEngineErr, zap.Error(err),
|
|
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
|
}
|
|
}()
|
|
}
|
|
_, err = ei.openedEngine.Close(ei.ctx)
|
|
if err != nil {
|
|
logutil.Logger(ei.ctx).Warn(LitErrCloseEngineErr, zap.Error(err),
|
|
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
|
return
|
|
}
|
|
ei.openedEngine = nil
|
|
}
|
|
|
|
// writerContext is used to keep a lightning local writer for each backfill worker.
|
|
type writerContext struct {
|
|
ctx context.Context
|
|
lWrite backend.EngineWriter
|
|
fLock *sync.RWMutex
|
|
writtenBytes int64
|
|
}
|
|
|
|
// CreateWriter creates a new writerContext.
|
|
func (ei *engineInfo) CreateWriter(id int, writerCfg *backend.LocalWriterConfig) (Writer, error) {
|
|
ei.memRoot.RefreshConsumption()
|
|
ok := ei.memRoot.CheckConsume(structSizeWriterCtx)
|
|
if !ok {
|
|
return nil, genWriterAllocMemFailedErr(ei.ctx, ei.memRoot, ei.jobID, ei.indexID)
|
|
}
|
|
|
|
wCtx, err := ei.newWriterContext(id, writerCfg)
|
|
if err != nil {
|
|
logutil.Logger(ei.ctx).Error(LitErrCreateContextFail, zap.Error(err),
|
|
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID),
|
|
zap.Int("worker ID", id))
|
|
return nil, err
|
|
}
|
|
|
|
logutil.Logger(ei.ctx).Info(LitInfoCreateWrite, zap.Int64("job ID", ei.jobID),
|
|
zap.Int64("index ID", ei.indexID), zap.Int("worker ID", id),
|
|
zap.Int64("allocate memory", structSizeWriterCtx+writerCfg.Local.MemCacheSize),
|
|
zap.Int64("current memory usage", ei.memRoot.CurrentUsage()),
|
|
zap.Int64("max memory quota", ei.memRoot.MaxMemoryQuota()))
|
|
return wCtx, err
|
|
}
|
|
|
|
// newWriterContext will get worker local writer from engine info writer cache first, if exists.
|
|
// If local writer not exist, then create new one and store it into engine info writer cache.
|
|
// note: operate ei.writeCache map is not thread safe please make sure there is sync mechanism to
|
|
// make sure the safe.
|
|
func (ei *engineInfo) newWriterContext(workerID int, writerCfg *backend.LocalWriterConfig) (*writerContext, error) {
|
|
lWrite, exist := ei.writerCache.Load(workerID)
|
|
if !exist {
|
|
ok := ei.memRoot.CheckConsume(writerCfg.Local.MemCacheSize)
|
|
if !ok {
|
|
return nil, genWriterAllocMemFailedErr(ei.ctx, ei.memRoot, ei.jobID, ei.indexID)
|
|
}
|
|
var err error
|
|
lWrite, err = ei.openedEngine.LocalWriter(ei.ctx, writerCfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Cache the local writer.
|
|
ei.writerCache.Store(workerID, lWrite)
|
|
}
|
|
wc := &writerContext{
|
|
ctx: ei.ctx,
|
|
lWrite: lWrite,
|
|
fLock: ei.flushLock,
|
|
}
|
|
return wc, nil
|
|
}
|
|
|
|
func (ei *engineInfo) closeWriters() error {
|
|
var firstErr error
|
|
for _, wid := range ei.writerCache.Keys() {
|
|
if w, ok := ei.writerCache.Load(wid); ok {
|
|
_, err := w.Close(ei.ctx)
|
|
if err != nil {
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
}
|
|
ei.writerCache.Delete(wid)
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
// WriteRow Write one row into local writer buffer.
|
|
func (wCtx *writerContext) WriteRow(ctx context.Context, key, idxVal []byte, handle tidbkv.Handle) error {
|
|
kvs := make([]common.KvPair, 1)
|
|
kvs[0].Key = key
|
|
kvs[0].Val = idxVal
|
|
if handle != nil {
|
|
kvs[0].RowID = handle.Encoded()
|
|
}
|
|
wCtx.writtenBytes += int64(len(key) + len(idxVal))
|
|
row := kv.MakeRowsFromKvPairs(kvs)
|
|
return wCtx.lWrite.AppendRows(ctx, nil, row)
|
|
}
|
|
|
|
// LockForWrite locks the local writer for write.
|
|
func (wCtx *writerContext) LockForWrite() (unlock func()) {
|
|
wCtx.fLock.RLock()
|
|
return func() {
|
|
wCtx.fLock.RUnlock()
|
|
}
|
|
}
|
|
|
|
// WrittenBytes returns the number of bytes written by this writer.
|
|
func (wCtx *writerContext) WrittenBytes() int64 {
|
|
return wCtx.writtenBytes
|
|
}
|