441 lines
15 KiB
Go
441 lines
15 KiB
Go
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package backend
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
|
|
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
|
|
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/log"
|
|
"github.com/pingcap/tidb/pkg/lightning/metric"
|
|
"github.com/pingcap/tidb/pkg/lightning/mydump"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
|
|
)
|
|
|
|
func makeTag(tableName string, engineID int64) string {
|
|
return fmt.Sprintf("%s:%d", tableName, engineID)
|
|
}
|
|
|
|
func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger {
|
|
return logger.With(
|
|
zap.String("engineTag", tag),
|
|
zap.Stringer("engineUUID", engineUUID),
|
|
)
|
|
}
|
|
|
|
// MakeUUID generates a UUID for the engine and a tag for the engine.
|
|
func MakeUUID(tableName string, engineID int64) (string, uuid.UUID) {
|
|
tag := makeTag(tableName, engineID)
|
|
engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
|
|
return tag, engineUUID
|
|
}
|
|
|
|
var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")
|
|
|
|
// EngineFileSize represents the size of an engine on disk and in memory.
|
|
type EngineFileSize struct {
|
|
// UUID is the engine's UUID.
|
|
UUID uuid.UUID
|
|
// DiskSize is the estimated total file size on disk right now.
|
|
DiskSize int64
|
|
// MemSize is the total memory size used by the engine. This is the
|
|
// estimated additional size saved onto disk after calling Flush().
|
|
MemSize int64
|
|
// IsImporting indicates whether the engine performing Import().
|
|
IsImporting bool
|
|
}
|
|
|
|
// LocalWriterConfig defines the configuration to open a LocalWriter
|
|
type LocalWriterConfig struct {
|
|
// Local backend specified configuration
|
|
Local struct {
|
|
// is the chunk KV written to this LocalWriter sent in order
|
|
IsKVSorted bool
|
|
// MemCacheSize specifies the estimated memory cache limit used by this local
|
|
// writer. It has higher priority than BackendConfig.LocalWriterMemCacheSize if
|
|
// set.
|
|
MemCacheSize int64
|
|
}
|
|
// TiDB backend specified configuration
|
|
TiDB struct {
|
|
TableName string
|
|
}
|
|
}
|
|
|
|
// EngineConfig defines configuration used for open engine
|
|
type EngineConfig struct {
|
|
// TableInfo is the corresponding tidb table info
|
|
TableInfo *checkpoints.TidbTableInfo
|
|
// local backend specified configuration
|
|
Local LocalEngineConfig
|
|
// local backend external engine specified configuration
|
|
External *ExternalEngineConfig
|
|
// KeepSortDir indicates whether to keep the temporary sort directory
|
|
// when opening the engine, instead of removing it.
|
|
KeepSortDir bool
|
|
// TS is the preset timestamp of data in the engine. When it's 0, the used TS
|
|
// will be set lazily. This is used by local backend. This field will be written
|
|
// to engineMeta.TS and take effect in below cases:
|
|
// - engineManager.openEngine
|
|
// - engineManager.closeEngine only for an external engine
|
|
TS uint64
|
|
}
|
|
|
|
// LocalEngineConfig is the configuration used for local backend in OpenEngine.
|
|
type LocalEngineConfig struct {
|
|
// compact small SSTs before ingest into pebble
|
|
Compact bool
|
|
// raw kvs size threshold to trigger compact
|
|
CompactThreshold int64
|
|
// compact routine concurrency
|
|
CompactConcurrency int
|
|
|
|
// blocksize
|
|
BlockSize int
|
|
}
|
|
|
|
// ExternalEngineConfig is the configuration used for local backend external engine.
|
|
type ExternalEngineConfig struct {
|
|
ExtStore storeapi.Storage
|
|
DataFiles []string
|
|
StatFiles []string
|
|
StartKey []byte
|
|
EndKey []byte
|
|
JobKeys [][]byte
|
|
SplitKeys [][]byte
|
|
// TotalFileSize can be an estimated value.
|
|
TotalFileSize int64
|
|
// TotalKVCount can be an estimated value.
|
|
TotalKVCount int64
|
|
CheckHotspot bool
|
|
// MemCapacity is the memory capacity for the whole subtask.
|
|
MemCapacity int64
|
|
// OnDup is the action when a duplicate key is found during global sort.
|
|
OnDup engineapi.OnDuplicateKey
|
|
// this is the prefix of files recording conflicted KVs
|
|
FilePrefix string
|
|
}
|
|
|
|
// CheckCtx contains all parameters used in CheckRequirements
|
|
type CheckCtx struct {
|
|
DBMetas []*mydump.MDDatabaseMeta
|
|
}
|
|
|
|
// TargetInfoGetter defines the interfaces to get target information.
|
|
type TargetInfoGetter interface {
|
|
// FetchRemoteDBModels obtains the models of all databases. Currently, only
|
|
// the database name is filled.
|
|
FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error)
|
|
|
|
// FetchRemoteTableModels obtains the TableInfo of given tables under the schema
|
|
// name. It returns a map whose key is the table name in lower case and value is
|
|
// the TableInfo. If the table does not exist, it will not be included in the
|
|
// map.
|
|
//
|
|
// The returned table info does not need to be precise if the encoder, is not
|
|
// requiring them, but must at least fill in the following fields for
|
|
// TablesFromMeta to succeed:
|
|
// - Name
|
|
// - State (must be model.StatePublic)
|
|
// - ID
|
|
// - Columns
|
|
// * Name
|
|
// * State (must be model.StatePublic)
|
|
// * Offset (must be 0, 1, 2, ...)
|
|
// - PKIsHandle (true = do not generate _tidb_rowid)
|
|
FetchRemoteTableModels(ctx context.Context, schemaName string, tableNames []string) (map[string]*model.TableInfo, error)
|
|
|
|
// CheckRequirements performs the check whether the backend satisfies the version requirements
|
|
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
|
|
}
|
|
|
|
// Backend defines the interface for a backend.
|
|
// Implementations of this interface must be goroutine safe: you can share an
|
|
// instance and execute any method anywhere.
|
|
// Usual workflow:
|
|
// 1. Create a `Backend` for the whole process.
|
|
// 2. For each table,
|
|
// i. Split into multiple "batches" consisting of data files with roughly equal total size.
|
|
// ii. For each batch,
|
|
// a. Create an `OpenedEngine` via `backend.OpenEngine()`
|
|
// b. For each chunk, deliver data into the engine via `engine.WriteRows()`
|
|
// c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()`
|
|
// d. Import data via `engine.Import()`
|
|
// e. Cleanup via `engine.Cleanup()`
|
|
// 3. Close the connection via `backend.Close()`
|
|
type Backend interface {
|
|
// Close the connection to the backend.
|
|
Close()
|
|
|
|
// RetryImportDelay returns the duration to sleep when retrying an import
|
|
RetryImportDelay() time.Duration
|
|
|
|
// ShouldPostProcess returns whether KV-specific post-processing should be
|
|
// performed for this backend. Post-processing includes checksum and analyze.
|
|
ShouldPostProcess() bool
|
|
|
|
OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error
|
|
|
|
CloseEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error
|
|
|
|
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
|
|
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
|
|
// It's safe to reset or cleanup this engine.
|
|
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
|
|
|
|
CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error
|
|
|
|
// FlushEngine ensures all KV pairs written to an open engine has been
|
|
// synchronized, such that kill-9'ing Lightning afterwards and resuming from
|
|
// checkpoint can recover the exact same content.
|
|
//
|
|
// This method is only relevant for local backend, and is no-op for all
|
|
// other backends.
|
|
FlushEngine(ctx context.Context, engineUUID uuid.UUID) error
|
|
|
|
// FlushAllEngines performs FlushEngine on all opened engines. This is a
|
|
// very expensive operation and should only be used in some rare situation
|
|
// (e.g. preparing to resolve a disk quota violation).
|
|
FlushAllEngines(ctx context.Context) error
|
|
|
|
// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
|
|
LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)
|
|
}
|
|
|
|
// EngineManager is the manager of engines.
|
|
// this is a wrapper of Backend, which provides some common methods for managing engines.
|
|
// and it has no states, can be created on demand
|
|
type EngineManager struct {
|
|
backend Backend
|
|
}
|
|
|
|
type engine struct {
|
|
backend Backend
|
|
logger log.Logger
|
|
uuid uuid.UUID
|
|
// id of the engine, used to generate uuid and stored in checkpoint
|
|
// for index engine it's -1
|
|
id int32
|
|
}
|
|
|
|
// OpenedEngine is an opened engine, allowing data to be written via WriteRows.
|
|
// This type is goroutine safe: you can share an instance and execute any method
|
|
// anywhere.
|
|
type OpenedEngine struct {
|
|
engine
|
|
tableName string
|
|
config *EngineConfig
|
|
}
|
|
|
|
// MakeEngineManager creates a new Backend from an Backend.
|
|
func MakeEngineManager(ab Backend) EngineManager {
|
|
return EngineManager{backend: ab}
|
|
}
|
|
|
|
// OpenEngine opens an engine with the given table name and engine ID.
|
|
func (be EngineManager) OpenEngine(
|
|
ctx context.Context,
|
|
config *EngineConfig,
|
|
tableName string,
|
|
engineID int32,
|
|
) (*OpenedEngine, error) {
|
|
tag, engineUUID := MakeUUID(tableName, int64(engineID))
|
|
logger := makeLogger(log.Wrap(logutil.Logger(ctx)), tag, engineUUID)
|
|
|
|
if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if m, ok := metric.FromContext(ctx); ok {
|
|
openCounter := m.ImporterEngineCounter.WithLabelValues("open")
|
|
openCounter.Inc()
|
|
}
|
|
|
|
logger.Info("open engine")
|
|
|
|
failpoint.Inject("FailIfEngineCountExceeds", func(val failpoint.Value) {
|
|
if m, ok := metric.FromContext(ctx); ok {
|
|
closedCounter := m.ImporterEngineCounter.WithLabelValues("closed")
|
|
openCounter := m.ImporterEngineCounter.WithLabelValues("open")
|
|
openCount := metric.ReadCounter(openCounter)
|
|
|
|
closedCount := metric.ReadCounter(closedCounter)
|
|
if injectValue := val.(int); openCount-closedCount > float64(injectValue) {
|
|
panic(fmt.Sprintf(
|
|
"forcing failure due to FailIfEngineCountExceeds: %v - %v >= %d",
|
|
openCount, closedCount, injectValue))
|
|
}
|
|
}
|
|
})
|
|
|
|
return &OpenedEngine{
|
|
engine: engine{
|
|
backend: be.backend,
|
|
logger: logger,
|
|
uuid: engineUUID,
|
|
id: engineID,
|
|
},
|
|
tableName: tableName,
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
// Close the opened engine to prepare it for importing.
|
|
func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {
|
|
closedEngine, err := engine.unsafeClose(ctx, engine.config)
|
|
if err == nil {
|
|
if m, ok := metric.FromContext(ctx); ok {
|
|
m.ImporterEngineCounter.WithLabelValues("closed").Inc()
|
|
}
|
|
}
|
|
return closedEngine, err
|
|
}
|
|
|
|
// Flush current written data for local backend
|
|
func (engine *OpenedEngine) Flush(ctx context.Context) error {
|
|
return engine.backend.FlushEngine(ctx, engine.uuid)
|
|
}
|
|
|
|
// LocalWriter returns a writer that writes to the local backend.
|
|
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error) {
|
|
return engine.backend.LocalWriter(ctx, cfg, engine.uuid)
|
|
}
|
|
|
|
// UnsafeCloseEngine closes the engine without first opening it.
|
|
// This method is "unsafe" as it does not follow the normal operation sequence
|
|
// (Open -> Write -> Close -> Import). This method should only be used when one
|
|
// knows via other ways that the engine has already been opened, e.g. when
|
|
// resuming from a checkpoint.
|
|
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig,
|
|
tableName string, engineID int32) (*ClosedEngine, error) {
|
|
tag, engineUUID := MakeUUID(tableName, int64(engineID))
|
|
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
|
|
}
|
|
|
|
// UnsafeCloseEngineWithUUID closes the engine without first opening it.
|
|
// This method is "unsafe" as it does not follow the normal operation sequence
|
|
// (Open -> Write -> Close -> Import). This method should only be used when one
|
|
// knows via other ways that the engine has already been opened, e.g. when
|
|
// resuming from a checkpoint.
|
|
func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string,
|
|
engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
|
|
return engine{
|
|
backend: be.backend,
|
|
logger: makeLogger(log.Wrap(logutil.Logger(ctx)), tag, engineUUID),
|
|
uuid: engineUUID,
|
|
id: id,
|
|
}.unsafeClose(ctx, cfg)
|
|
}
|
|
|
|
func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
|
|
task := en.logger.Begin(zap.InfoLevel, "engine close")
|
|
err := en.backend.CloseEngine(ctx, cfg, en.uuid)
|
|
task.End(zap.ErrorLevel, err)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &ClosedEngine{engine: en}, nil
|
|
}
|
|
|
|
// GetID get engine id.
|
|
func (en engine) GetID() int32 {
|
|
return en.id
|
|
}
|
|
|
|
func (en engine) GetUUID() uuid.UUID {
|
|
return en.uuid
|
|
}
|
|
|
|
// ClosedEngine represents a closed engine, allowing ingestion into the target.
|
|
// This type is goroutine safe: you can share an instance and execute any method
|
|
// anywhere.
|
|
type ClosedEngine struct {
|
|
engine
|
|
}
|
|
|
|
// NewClosedEngine creates a new ClosedEngine.
|
|
func NewClosedEngine(backend Backend, logger log.Logger, uuid uuid.UUID, id int32) *ClosedEngine {
|
|
return &ClosedEngine{
|
|
engine: engine{
|
|
backend: backend,
|
|
logger: logger,
|
|
uuid: uuid,
|
|
id: id,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Import the data written to the engine into the target.
|
|
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
|
|
var err error
|
|
|
|
for i := range importMaxRetryTimes {
|
|
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
|
|
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
|
|
if !common.IsRetryableError(err) {
|
|
if common.ErrFoundDuplicateKeys.Equal(err) {
|
|
task.End(zap.WarnLevel, err)
|
|
} else {
|
|
task.End(zap.ErrorLevel, err)
|
|
}
|
|
return err
|
|
}
|
|
task.Warn("import spuriously failed, going to retry again", log.ShortError(err))
|
|
time.Sleep(engine.backend.RetryImportDelay())
|
|
}
|
|
|
|
return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, importMaxRetryTimes)
|
|
}
|
|
|
|
// Cleanup deletes the intermediate data from target.
|
|
func (engine *ClosedEngine) Cleanup(ctx context.Context) error {
|
|
task := engine.logger.Begin(zap.InfoLevel, "cleanup")
|
|
err := engine.backend.CleanupEngine(ctx, engine.uuid)
|
|
task.End(zap.WarnLevel, err)
|
|
return err
|
|
}
|
|
|
|
// Logger returns the logger for the engine.
|
|
func (engine *ClosedEngine) Logger() log.Logger {
|
|
return engine.logger
|
|
}
|
|
|
|
// EngineWriter is the interface for writing data to an engine.
|
|
type EngineWriter interface {
|
|
AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error
|
|
IsSynced() bool
|
|
Close(ctx context.Context) (common.ChunkFlushStatus, error)
|
|
}
|
|
|
|
// GetEngineUUID returns the engine UUID.
|
|
func (engine *OpenedEngine) GetEngineUUID() uuid.UUID {
|
|
return engine.uuid
|
|
}
|