432 lines
14 KiB
Go
432 lines
14 KiB
Go
// Copyright 2025 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package checkpoint
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/tidb/br/pkg/glue"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
)
|
|
|
|
type SnapshotMetaManagerT = MetaManager[
|
|
RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore,
|
|
]
|
|
type LogMetaManagerT = LogMetaManager[
|
|
LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore,
|
|
]
|
|
|
|
type tickDurationConfig struct {
|
|
tickDurationForFlush time.Duration
|
|
tickDurationForChecksum time.Duration
|
|
retryDuration time.Duration
|
|
}
|
|
|
|
func DefaultTickDurationConfig() tickDurationConfig {
|
|
return tickDurationConfig{
|
|
tickDurationForFlush: defaultTickDurationForFlush,
|
|
tickDurationForChecksum: defaultTickDurationForChecksum,
|
|
retryDuration: defaultRetryDuration,
|
|
}
|
|
}
|
|
|
|
type MetaManager[K KeyType, SV, LV ValueType, M any] interface {
|
|
fmt.Stringer
|
|
|
|
LoadCheckpointData(context.Context, func(K, LV) error) (time.Duration, error)
|
|
LoadCheckpointChecksum(context.Context) (map[int64]*ChecksumItem, time.Duration, error)
|
|
LoadCheckpointMetadata(context.Context) (*M, error)
|
|
SaveCheckpointMetadata(context.Context, *M) error
|
|
ExistsCheckpointMetadata(context.Context) (bool, error)
|
|
RemoveCheckpointData(context.Context) error
|
|
|
|
// start checkpoint runner
|
|
StartCheckpointRunner(
|
|
context.Context, tickDurationConfig, func(*RangeGroup[K, SV]) ([]byte, error),
|
|
) (*CheckpointRunner[K, SV], error)
|
|
|
|
// close session
|
|
Close()
|
|
}
|
|
|
|
type LogMetaManager[K KeyType, SV, LV ValueType, M any] interface {
|
|
MetaManager[K, SV, LV, M]
|
|
|
|
LoadCheckpointProgress(context.Context) (*CheckpointProgress, error)
|
|
SaveCheckpointProgress(context.Context, *CheckpointProgress) error
|
|
ExistsCheckpointProgress(context.Context) (bool, error)
|
|
|
|
LoadCheckpointIngestIndexRepairSQLs(context.Context) (*CheckpointIngestIndexRepairSQLs, error)
|
|
SaveCheckpointIngestIndexRepairSQLs(context.Context, *CheckpointIngestIndexRepairSQLs) error
|
|
ExistsCheckpointIngestIndexRepairSQLs(context.Context) (bool, error)
|
|
|
|
TryGetStorage() storeapi.Storage
|
|
}
|
|
|
|
type TableMetaManager[K KeyType, SV, LV ValueType, M any] struct {
|
|
se glue.Session
|
|
runnerSe glue.Session
|
|
dom *domain.Domain
|
|
dbName string
|
|
}
|
|
|
|
func NewLogTableMetaManager(
|
|
g glue.Glue,
|
|
dom *domain.Domain,
|
|
dbName string,
|
|
restoreID uint64,
|
|
) (LogMetaManagerT, error) {
|
|
se, err := g.CreateSession(dom.Store())
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
runnerSe, err := g.CreateSession(dom.Store())
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return &TableMetaManager[
|
|
LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore,
|
|
]{
|
|
se: se,
|
|
runnerSe: runnerSe,
|
|
dom: dom,
|
|
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
|
|
}, nil
|
|
}
|
|
|
|
func NewSnapshotTableMetaManager(
|
|
g glue.Glue,
|
|
dom *domain.Domain,
|
|
dbName string,
|
|
restoreID uint64,
|
|
) (SnapshotMetaManagerT, error) {
|
|
se, err := g.CreateSession(dom.Store())
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
runnerSe, err := g.CreateSession(dom.Store())
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return &TableMetaManager[
|
|
RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore,
|
|
]{
|
|
se: se,
|
|
runnerSe: runnerSe,
|
|
dom: dom,
|
|
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
|
|
}, nil
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) String() string {
|
|
return fmt.Sprintf("databases[such as %s]", manager.dbName)
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) Close() {
|
|
if manager.se != nil {
|
|
manager.se.Close()
|
|
}
|
|
if manager.runnerSe != nil {
|
|
manager.runnerSe.Close()
|
|
}
|
|
}
|
|
|
|
// LoadCheckpointData loads the whole checkpoint range data and retrieve the metadata of restored ranges
|
|
// and return the total time cost in the past executions
|
|
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointData(
|
|
ctx context.Context,
|
|
fn func(K, LV) error,
|
|
) (time.Duration, error) {
|
|
execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor()
|
|
return selectCheckpointData(ctx, execCtx, manager.dbName, fn)
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointChecksum(
|
|
ctx context.Context,
|
|
) (map[int64]*ChecksumItem, time.Duration, error) {
|
|
execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor()
|
|
return selectCheckpointChecksum(ctx, execCtx, manager.dbName)
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointMetadata(
|
|
ctx context.Context,
|
|
) (*M, error) {
|
|
execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor()
|
|
m := new(M)
|
|
err := selectCheckpointMeta(ctx, execCtx, manager.dbName, checkpointMetaTableName, m)
|
|
return m, err
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointMetadata(
|
|
ctx context.Context,
|
|
meta *M,
|
|
) error {
|
|
err := initCheckpointTable(ctx, manager.se, manager.dbName,
|
|
[]string{checkpointDataTableName, checkpointChecksumTableName})
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if meta != nil {
|
|
return insertCheckpointMeta(ctx, manager.se, manager.dbName, checkpointMetaTableName, meta)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata(
|
|
ctx context.Context,
|
|
) (bool, error) {
|
|
// we only check the existence of the checkpoint data table
|
|
// because the checkpoint metadata is not used for restore
|
|
return manager.dom.InfoSchema().
|
|
TableExists(ast.NewCIStr(manager.dbName), ast.NewCIStr(checkpointMetaTableName)), nil
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) RemoveCheckpointData(
|
|
ctx context.Context,
|
|
) error {
|
|
return dropCheckpointTables(ctx, manager.dom, manager.se, manager.dbName, []string{
|
|
checkpointDataTableName,
|
|
checkpointChecksumTableName,
|
|
checkpointMetaTableName,
|
|
checkpointProgressTableName,
|
|
checkpointIngestTableName,
|
|
})
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointProgress(
|
|
ctx context.Context,
|
|
) (*CheckpointProgress, error) {
|
|
execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor()
|
|
m := &CheckpointProgress{}
|
|
err := selectCheckpointMeta(ctx, execCtx, manager.dbName, checkpointProgressTableName, m)
|
|
return m, errors.Trace(err)
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointProgress(
|
|
ctx context.Context,
|
|
meta *CheckpointProgress,
|
|
) error {
|
|
return insertCheckpointMeta(ctx, manager.se, manager.dbName, checkpointProgressTableName, meta)
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointProgress(
|
|
ctx context.Context,
|
|
) (bool, error) {
|
|
return manager.dom.InfoSchema().
|
|
TableExists(ast.NewCIStr(manager.dbName), ast.NewCIStr(checkpointProgressTableName)), nil
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs(
|
|
ctx context.Context,
|
|
) (*CheckpointIngestIndexRepairSQLs, error) {
|
|
execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor()
|
|
m := &CheckpointIngestIndexRepairSQLs{}
|
|
err := selectCheckpointMeta(ctx, execCtx, manager.dbName, checkpointIngestTableName, m)
|
|
return m, errors.Trace(err)
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs(
|
|
ctx context.Context,
|
|
meta *CheckpointIngestIndexRepairSQLs,
|
|
) error {
|
|
return insertCheckpointMeta(ctx, manager.se, manager.dbName, checkpointIngestTableName, meta)
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs(
|
|
ctx context.Context,
|
|
) (bool, error) {
|
|
return manager.dom.InfoSchema().
|
|
TableExists(ast.NewCIStr(manager.dbName), ast.NewCIStr(checkpointIngestTableName)), nil
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) StartCheckpointRunner(
|
|
ctx context.Context,
|
|
cfg tickDurationConfig,
|
|
valueMarshaler func(*RangeGroup[K, SV]) ([]byte, error),
|
|
) (*CheckpointRunner[K, SV], error) {
|
|
runner := newCheckpointRunner(
|
|
newTableCheckpointStorage(manager.runnerSe, manager.dbName),
|
|
nil, valueMarshaler)
|
|
manager.runnerSe = nil
|
|
// for restore, no need to set lock
|
|
runner.startCheckpointMainLoop(
|
|
ctx,
|
|
cfg.tickDurationForFlush, cfg.tickDurationForChecksum, 0, cfg.retryDuration)
|
|
return runner, nil
|
|
}
|
|
|
|
func (manager *TableMetaManager[K, SV, LV, M]) TryGetStorage() storeapi.Storage {
|
|
return nil
|
|
}
|
|
|
|
type StorageMetaManager[K KeyType, SV, LV ValueType, M any] struct {
|
|
storage storeapi.Storage
|
|
cipher *backuppb.CipherInfo
|
|
clusterID string
|
|
taskName string
|
|
}
|
|
|
|
func NewSnapshotStorageMetaManager(
|
|
storage storeapi.Storage,
|
|
cipher *backuppb.CipherInfo,
|
|
clusterID uint64,
|
|
prefix string,
|
|
restoreID uint64,
|
|
) SnapshotMetaManagerT {
|
|
return &StorageMetaManager[
|
|
RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore,
|
|
]{
|
|
storage: storage,
|
|
cipher: cipher,
|
|
clusterID: fmt.Sprintf("%d", clusterID),
|
|
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
|
|
}
|
|
}
|
|
|
|
func NewLogStorageMetaManager(
|
|
storage storeapi.Storage,
|
|
cipher *backuppb.CipherInfo,
|
|
clusterID uint64,
|
|
prefix string,
|
|
restoreID uint64,
|
|
) LogMetaManagerT {
|
|
return &StorageMetaManager[
|
|
LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore,
|
|
]{
|
|
storage: storage,
|
|
cipher: cipher,
|
|
clusterID: fmt.Sprintf("%d", clusterID),
|
|
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
|
|
}
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) String() string {
|
|
return fmt.Sprintf("path[%s]", fmt.Sprintf(CheckpointRestoreDirFormat, manager.clusterID))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) Close() {}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointData(
|
|
ctx context.Context,
|
|
fn func(K, LV) error,
|
|
) (time.Duration, error) {
|
|
return walkCheckpointFile(ctx, manager.storage, manager.cipher, getCheckpointDataDirByName(manager.taskName), fn)
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointChecksum(
|
|
ctx context.Context,
|
|
) (map[int64]*ChecksumItem, time.Duration, error) {
|
|
return loadCheckpointChecksum(ctx, manager.storage, getCheckpointChecksumDirByName(manager.taskName))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointMetadata(
|
|
ctx context.Context,
|
|
) (*M, error) {
|
|
m := new(M)
|
|
err := loadCheckpointMeta(ctx, manager.storage, getCheckpointMetaPathByName(manager.taskName), m)
|
|
return m, errors.Trace(err)
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointMetadata(
|
|
ctx context.Context,
|
|
meta *M,
|
|
) error {
|
|
return saveCheckpointMetadata(ctx, manager.storage, meta, getCheckpointMetaPathByName(manager.taskName))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata(
|
|
ctx context.Context,
|
|
) (bool, error) {
|
|
return manager.storage.FileExists(ctx, getCheckpointMetaPathByName(manager.taskName))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) RemoveCheckpointData(
|
|
ctx context.Context,
|
|
) error {
|
|
prefix := fmt.Sprintf(CheckpointRestoreDirFormat, manager.taskName)
|
|
return removeCheckpointData(ctx, manager.storage, prefix)
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointProgress(
|
|
ctx context.Context,
|
|
) (*CheckpointProgress, error) {
|
|
m := &CheckpointProgress{}
|
|
err := loadCheckpointMeta(ctx, manager.storage, getCheckpointProgressPathByName(manager.taskName), m)
|
|
return m, errors.Trace(err)
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointProgress(
|
|
ctx context.Context,
|
|
meta *CheckpointProgress,
|
|
) error {
|
|
return saveCheckpointMetadata(ctx, manager.storage, meta, getCheckpointProgressPathByName(manager.taskName))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointProgress(
|
|
ctx context.Context,
|
|
) (bool, error) {
|
|
return manager.storage.FileExists(ctx, getCheckpointProgressPathByName(manager.taskName))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs(
|
|
ctx context.Context,
|
|
) (*CheckpointIngestIndexRepairSQLs, error) {
|
|
m := &CheckpointIngestIndexRepairSQLs{}
|
|
err := loadCheckpointMeta(ctx, manager.storage, getCheckpointIngestIndexPathByName(manager.taskName), m)
|
|
return m, errors.Trace(err)
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs(
|
|
ctx context.Context,
|
|
meta *CheckpointIngestIndexRepairSQLs,
|
|
) error {
|
|
return saveCheckpointMetadata(ctx, manager.storage, meta, getCheckpointIngestIndexPathByName(manager.taskName))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs(
|
|
ctx context.Context,
|
|
) (bool, error) {
|
|
return manager.storage.FileExists(ctx, getCheckpointIngestIndexPathByName(manager.taskName))
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) StartCheckpointRunner(
|
|
ctx context.Context,
|
|
cfg tickDurationConfig,
|
|
valueMarshaler func(*RangeGroup[K, SV]) ([]byte, error),
|
|
) (*CheckpointRunner[K, SV], error) {
|
|
checkpointStorage, err := newExternalCheckpointStorage(
|
|
ctx, manager.storage, nil, flushPathForRestore(manager.taskName))
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
runner := newCheckpointRunner(checkpointStorage, manager.cipher, valueMarshaler)
|
|
|
|
// for restore, no need to set lock
|
|
runner.startCheckpointMainLoop(
|
|
ctx,
|
|
cfg.tickDurationForFlush, cfg.tickDurationForChecksum, 0, cfg.retryDuration)
|
|
return runner, nil
|
|
}
|
|
|
|
func (manager *StorageMetaManager[K, SV, LV, M]) TryGetStorage() storeapi.Storage {
|
|
return manager.storage
|
|
}
|