// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package importer import ( "context" "io" "math" "net" "os" "path/filepath" "strconv" "strings" "sync" "time" "unicode/utf8" "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" tidb "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/dxf/framework/proto" "github.com/pingcap/tidb/pkg/keyspace" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/backend" "github.com/pingcap/tidb/pkg/lightning/backend/encode" "github.com/pingcap/tidb/pkg/lightning/backend/kv" "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/lightning/checkpoints" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/lightning/metric" "github.com/pingcap/tidb/pkg/lightning/mydump" verify "github.com/pingcap/tidb/pkg/lightning/verification" "github.com/pingcap/tidb/pkg/meta/autoid" tidbmetrics "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/objstore/compressedio" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/sessiontxn" statshandle "github.com/pingcap/tidb/pkg/statistics/handle" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/etcd" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/promutil" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tidb/pkg/util/sqlkiller" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/util" "github.com/tikv/pd/client/pkg/caller" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/multierr" "go.uber.org/zap" ) // NewTiKVModeSwitcher make it a var, so we can mock it in tests. var NewTiKVModeSwitcher = local.NewTiKVModeSwitcher var ( // CheckDiskQuotaInterval is the default time interval to check disk quota. // TODO: make it dynamically adjusting according to the speed of import and the disk size. CheckDiskQuotaInterval = 10 * time.Second // defaultMaxEngineSize is the default max engine size in bytes. // we make it 5 times larger than lightning default engine size to reduce range overlap, especially for index, // since we have an index engine per distributed subtask. // for 1TiB data, we can divide it into 2 engines that runs on 2 TiDB. it can have a good balance between // range overlap and sort speed in one of our test of: // - 10 columns, PK + 6 secondary index 2 of which is mv index // - 1.05 KiB per row, 527 MiB per file, 1024000000 rows, 1 TiB total // // it might not be the optimal value for other cases. defaultMaxEngineSize = int64(5 * config.DefaultBatchSize) ) // Chunk records the chunk information. type Chunk struct { Path string FileSize int64 Offset int64 EndOffset int64 PrevRowIDMax int64 RowIDMax int64 Type mydump.SourceType Compression mydump.Compression Timestamp int64 ParquetMeta mydump.ParquetFileMeta } // prepareSortDir creates a new directory for import, remove previous sort directory if exists. func prepareSortDir(e *LoadDataController, id string, tidbCfg *tidb.Config) (string, error) { importDir := GetImportRootDir(tidbCfg) sortDir := filepath.Join(importDir, id) if info, err := os.Stat(importDir); err != nil || !info.IsDir() { if err != nil && !os.IsNotExist(err) { e.logger.Error("stat import dir failed", zap.String("import_dir", importDir), zap.Error(err)) return "", errors.Trace(err) } if info != nil && !info.IsDir() { e.logger.Warn("import dir is not a dir, remove it", zap.String("import_dir", importDir)) if err := os.RemoveAll(importDir); err != nil { return "", errors.Trace(err) } } e.logger.Info("import dir not exists, create it", zap.String("import_dir", importDir)) if err := os.MkdirAll(importDir, 0o700); err != nil { e.logger.Error("failed to make dir", zap.String("import_dir", importDir), zap.Error(err)) return "", errors.Trace(err) } } // todo: remove this after we support checkpoint if _, err := os.Stat(sortDir); err != nil { if !os.IsNotExist(err) { e.logger.Error("stat sort dir failed", zap.String("sort_dir", sortDir), zap.Error(err)) return "", errors.Trace(err) } } else { e.logger.Warn("sort dir already exists, remove it", zap.String("sort_dir", sortDir)) if err := os.RemoveAll(sortDir); err != nil { return "", errors.Trace(err) } } return sortDir, nil } // GetRegionSplitSizeKeys gets the region split size and keys from PD. func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error) { tidbCfg := tidb.GetGlobalConfig() tls, err := common.NewTLS( tidbCfg.Security.ClusterSSLCA, tidbCfg.Security.ClusterSSLCert, tidbCfg.Security.ClusterSSLKey, "", nil, nil, nil, ) if err != nil { return 0, 0, err } tlsOpt := tls.ToPDSecurityOption() addrs := strings.Split(tidbCfg.Path, ",") pdCli, err := NewClientWithContext(ctx, caller.Component("tidb-table-importer"), addrs, tlsOpt) if err != nil { return 0, 0, errors.Trace(err) } defer pdCli.Close() return local.GetRegionSplitSizeKeys(ctx, pdCli, tls) } // NewTableImporter creates a new table importer. func NewTableImporter( ctx context.Context, e *LoadDataController, id string, kvStore tidbkv.Storage, ) (ti *TableImporter, err error) { idAlloc := kv.NewPanickingAllocators(e.Table.Meta().SepAutoInc()) tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta()) if err != nil { return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name) } tidbCfg := tidb.GetGlobalConfig() // todo: we only need to prepare this once on each node(we might call it 3 times in distribution framework) dir, err := prepareSortDir(e, id, tidbCfg) if err != nil { return nil, err } hostPort := net.JoinHostPort("127.0.0.1", strconv.Itoa(int(tidbCfg.Status.StatusPort))) tls, err := common.NewTLS( tidbCfg.Security.ClusterSSLCA, tidbCfg.Security.ClusterSSLCert, tidbCfg.Security.ClusterSSLKey, hostPort, nil, nil, nil, ) if err != nil { return nil, err } backendConfig := e.getLocalBackendCfg(kvStore.GetKeyspace(), tidbCfg.Path, dir) d := kvStore.(tidbkv.StorageWithPD).GetPDClient().GetServiceDiscovery() localBackend, err := local.NewBackend(ctx, tls, backendConfig, d) if err != nil { return nil, err } return &TableImporter{ LoadDataController: e, id: id, backend: localBackend, kvStore: kvStore, tableInfo: &checkpoints.TidbTableInfo{ ID: e.Table.Meta().ID, Name: e.Table.Meta().Name.O, Core: e.Table.Meta(), }, encTable: tbl, dbID: e.DBID, keyspace: kvStore.GetCodec().GetKeyspace(), logger: e.logger.With(zap.String("import-id", id)), // this is the value we use for 50TiB data parallel import. // this might not be the optimal value. // todo: use different default for single-node import and distributed import. regionSplitSize: 2 * int64(config.SplitRegionSize), regionSplitKeys: 2 * int64(config.SplitRegionKeys), diskQuota: adjustDiskQuota(int64(e.DiskQuota), dir, e.logger), diskQuotaLock: new(syncutil.RWMutex), }, nil } // TableImporter is a table importer. type TableImporter struct { *LoadDataController // id is the unique id for this importer. // it's the task id if we are running in distributed framework, else it's an // uuid. we use this id to create a unique directory for this importer. id string backend *local.Backend kvStore tidbkv.Storage tableInfo *checkpoints.TidbTableInfo // this table has a separate id allocator used to record the max row id allocated. encTable table.Table dbID int64 keyspace []byte logger *zap.Logger regionSplitSize int64 regionSplitKeys int64 diskQuota int64 diskQuotaLock *syncutil.RWMutex chunkCh chan QueryChunk } type storeHelper struct { kvStore tidbkv.Storage } func (*storeHelper) GetTS(_ context.Context) (physical, logical int64, err error) { return 0, 0, nil } func (s *storeHelper) GetTiKVCodec() tikv.Codec { return s.kvStore.GetCodec() } var _ local.StoreHelper = (*storeHelper)(nil) // NewTableImporterForTest creates a new table importer for test. func NewTableImporterForTest(ctx context.Context, e *LoadDataController, id string, kvStore tidbkv.Storage) (*TableImporter, error) { helper := &storeHelper{kvStore: kvStore} idAlloc := kv.NewPanickingAllocators(e.Table.Meta().SepAutoInc()) tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta()) if err != nil { return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name) } tidbCfg := tidb.GetGlobalConfig() dir, err := prepareSortDir(e, id, tidbCfg) if err != nil { return nil, err } backendConfig := e.getLocalBackendCfg("", tidbCfg.Path, dir) localBackend, err := local.NewBackendForTest(ctx, backendConfig, helper) if err != nil { return nil, err } keyspace := helper.GetTiKVCodec().GetKeyspace() return &TableImporter{ LoadDataController: e, id: id, backend: localBackend, tableInfo: &checkpoints.TidbTableInfo{ ID: e.Table.Meta().ID, Name: e.Table.Meta().Name.O, Core: e.Table.Meta(), }, keyspace: keyspace, encTable: tbl, dbID: e.DBID, logger: e.logger.With(zap.String("import-id", id)), diskQuotaLock: new(syncutil.RWMutex), }, nil } // GetKeySpace gets the keyspace of the kv store. func (ti *TableImporter) GetKeySpace() []byte { return ti.keyspace } // GetKVStore gets the kv store. func (ti *TableImporter) GetKVStore() tidbkv.Storage { return ti.kvStore } func (e *LoadDataController) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { info := LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, e.dataStore, compressedio.DecompressConfig{ ZStdDecodeConcurrency: 1, }) if err != nil { return nil, errors.Trace(err) } return reader, nil }, Remote: &chunk.FileMeta, } parser, err := e.GetParser(ctx, info) if err != nil { return nil, err } if chunk.Chunk.Offset == 0 { // if data file is split, only the first chunk need to do skip. // see check in initOptions. if err = e.HandleSkipNRows(parser); err != nil { return nil, err } parser.SetRowID(chunk.Chunk.PrevRowIDMax) } else { // if we reached here, the file must be an uncompressed CSV file. if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil { return nil, err } } return parser, nil } func (ti *TableImporter) getKVEncoder(chunk *checkpoints.ChunkCheckpoint) (*TableKVEncoder, error) { return ti.LoadDataController.getKVEncoder(ti.logger, chunk, ti.encTable) } func (e *LoadDataController) getKVEncoder(logger *zap.Logger, chunk *checkpoints.ChunkCheckpoint, encTable table.Table) (*TableKVEncoder, error) { cfg := &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{ SQLMode: e.SQLMode, Timestamp: chunk.Timestamp, SysVars: e.ImportantSysVars, AutoRandomSeed: chunk.Chunk.PrevRowIDMax, }, Path: chunk.FileMeta.Path, Table: encTable, Logger: log.Logger{Logger: logger.With(zap.String("path", chunk.FileMeta.Path))}, } return NewTableKVEncoder(cfg, e) } // GetKVEncoderForDupResolve get the KV encoder for duplicate resolution. func (ti *TableImporter) GetKVEncoderForDupResolve() (*TableKVEncoder, error) { cfg := &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{ SQLMode: ti.SQLMode, SysVars: ti.ImportantSysVars, }, Table: ti.encTable, Logger: log.Logger{Logger: ti.logger}, UseIdentityAutoRowID: true, } return NewTableKVEncoderForDupResolve(cfg, ti.LoadDataController) } func (e *LoadDataController) calculateSubtaskCnt() int { // we want to split data files into subtask of size close to MaxEngineSize to reduce range overlap, // and evenly distribute them to subtasks. // we calculate subtask count first by round(TotalFileSize / maxEngineSize) // AllocateEngineIDs is using ceil() to calculate subtask count, engine size might be too small in some case, // such as 501G data, maxEngineSize will be about 250G, so we don't relay on it. // see https://github.com/pingcap/tidb/blob/b4183e1dc9bb01fb81d3aa79ca4b5b74387c6c2a/br/pkg/lightning/mydump/region.go#L109 // // for default e.MaxEngineSize = 500GiB, we have: // data size range(G) cnt adjusted-engine-size range(G) // [0, 750) 1 [0, 750) // [750, 1250) 2 [375, 625) // [1250, 1750) 3 [416, 583) // [1750, 2250) 4 [437, 562) var ( subtaskCount float64 maxEngineSize = int64(e.MaxEngineSize) ) if e.TotalFileSize <= maxEngineSize { subtaskCount = 1 } else { subtaskCount = math.Round(float64(e.TotalFileSize) / float64(e.MaxEngineSize)) } // for global sort task, since there is no overlap, // we make sure subtask count is a multiple of execute nodes count if e.IsGlobalSort() && e.ExecuteNodesCnt > 0 { subtaskCount = math.Ceil(subtaskCount/float64(e.ExecuteNodesCnt)) * float64(e.ExecuteNodesCnt) } return int(subtaskCount) } func (e *LoadDataController) getAdjustedMaxEngineSize() int64 { subtaskCount := e.calculateSubtaskCnt() // we adjust MaxEngineSize to make sure each subtask has a similar amount of data to import. return int64(math.Ceil(float64(e.TotalFileSize) / float64(subtaskCount))) } // SetExecuteNodeCnt sets the execute node count. func (e *LoadDataController) SetExecuteNodeCnt(cnt int) { e.ExecuteNodesCnt = cnt } // PopulateChunks populates chunks from table regions. // in dist framework, this should be done in the tidb node which is responsible for splitting job into subtasks // then table-importer handles data belongs to the subtask. func (e *LoadDataController) PopulateChunks(ctx context.Context) (chunksMap map[int32][]Chunk, err error) { task := log.BeginTask(e.logger, "populate chunks") defer func() { task.End(zap.ErrorLevel, err) }() tableMeta := &mydump.MDTableMeta{ DB: e.DBName, Name: e.Table.Meta().Name.O, DataFiles: e.toMyDumpFiles(), } adjustedMaxEngineSize := e.getAdjustedMaxEngineSize() e.logger.Info("adjust max engine size", zap.Int64("before", int64(e.MaxEngineSize)), zap.Int64("after", adjustedMaxEngineSize)) dataDivideCfg := &mydump.DataDivideConfig{ ColumnCnt: len(e.Table.Meta().Columns), EngineDataSize: adjustedMaxEngineSize, MaxChunkSize: int64(config.MaxRegionSize), Concurrency: e.ThreadCnt, IOWorkers: nil, Store: e.dataStore, TableMeta: tableMeta, StrictFormat: e.SplitFile, DataCharacterSet: *e.Charset, DataInvalidCharReplace: string(utf8.RuneError), ReadBlockSize: LoadDataReadBlockSize, CSV: *e.GenerateCSVConfig(), SkipParquetRowCount: common.SkipReadRowCount(e.Table.Meta()), } makeEngineCtx := logutil.WithLogger(ctx, e.logger) tableRegions, err2 := mydump.MakeTableRegions(makeEngineCtx, dataDivideCfg) if err2 != nil { e.logger.Error("populate chunks failed", zap.Error(err2)) return nil, err2 } timestamp := time.Now().Unix() // engineChunks indicates the map that contains the k-v: the engineID -> []chunk. engineChunks := make(map[int32][]Chunk, 0) for _, region := range tableRegions { chunks, found := engineChunks[region.EngineID] if !found { chunks = make([]Chunk, 0) } engineChunks[region.EngineID] = append(chunks, Chunk{ Path: region.FileMeta.Path, FileSize: region.FileMeta.FileSize, Offset: region.Chunk.Offset, EndOffset: region.Chunk.EndOffset, PrevRowIDMax: region.Chunk.PrevRowIDMax, RowIDMax: region.Chunk.RowIDMax, Type: region.FileMeta.Type, Compression: region.FileMeta.Compression, Timestamp: timestamp, ParquetMeta: region.FileMeta.ParquetMeta, }) } // Add index engine checkpoint engineChunks[common.IndexEngineID] = make([]Chunk, 0) return engineChunks, nil } // a simplified version of EstimateCompactionThreshold func (ti *TableImporter) getTotalRawFileSize(indexCnt int64) int64 { var totalSize int64 for _, file := range ti.dataFiles { totalSize += file.RealSize } return totalSize * indexCnt } // OpenIndexEngine opens an index engine. func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error) { idxEngineCfg := &backend.EngineConfig{ TableInfo: ti.tableInfo, } idxCnt := len(ti.tableInfo.Core.Indices) if !common.TableHasAutoRowID(ti.tableInfo.Core) { idxCnt-- } // todo: getTotalRawFileSize returns size of all data files, but in distributed framework, // we create one index engine for each engine, should reflect this in the future. threshold := local.EstimateCompactionThreshold2(ti.getTotalRawFileSize(int64(idxCnt))) idxEngineCfg.Local = backend.LocalEngineConfig{ Compact: threshold > 0, CompactConcurrency: 4, CompactThreshold: threshold, BlockSize: 16 * 1024, } fullTableName := ti.FullTableName() // todo: cleanup all engine data on any error since we don't support checkpoint for now // some return path, didn't make sure all data engine and index engine are cleaned up. // maybe we can add this in upper level to clean the whole local-sort directory mgr := backend.MakeEngineManager(ti.backend) return mgr.OpenEngine(ctx, idxEngineCfg, fullTableName, engineID) } // OpenDataEngine opens a data engine. func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error) { dataEngineCfg := &backend.EngineConfig{ TableInfo: ti.tableInfo, } // todo: support checking IsRowOrdered later. // also see test result here: https://github.com/pingcap/tidb/pull/47147 //if ti.tableMeta.IsRowOrdered { // dataEngineCfg.Local.Compact = true // dataEngineCfg.Local.CompactConcurrency = 4 // dataEngineCfg.Local.CompactThreshold = local.CompactionUpperThreshold //} mgr := backend.MakeEngineManager(ti.backend) return mgr.OpenEngine(ctx, dataEngineCfg, ti.FullTableName(), engineID) } // ImportAndCleanup imports the engine and cleanup the engine data. func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error) { var kvCount int64 importErr := closedEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys) if common.ErrFoundDuplicateKeys.Equal(importErr) { importErr = local.ConvertToErrFoundConflictRecords(importErr, ti.encTable) } if closedEngine.GetID() != common.IndexEngineID { // todo: change to a finer-grain progress later. // each row is encoded into 1 data key kvCount = ti.backend.GetImportedKVCount(closedEngine.GetUUID()) } cleanupErr := closedEngine.Cleanup(ctx) return kvCount, multierr.Combine(importErr, cleanupErr) } // Backend returns the backend of the importer. func (ti *TableImporter) Backend() *local.Backend { return ti.backend } // Close implements the io.Closer interface. func (ti *TableImporter) Close() error { if ti.LoadDataController != nil { ti.LoadDataController.Close() } ti.backend.Close() return nil } // Allocators returns allocators used to record max used ID, i.e. PanickingAllocators. func (ti *TableImporter) Allocators() autoid.Allocators { return ti.encTable.Allocators(nil) } // CheckDiskQuota checks disk quota. func (ti *TableImporter) CheckDiskQuota(ctx context.Context) { var locker sync.Locker lockDiskQuota := func() { if locker == nil { ti.diskQuotaLock.Lock() locker = ti.diskQuotaLock } } unlockDiskQuota := func() { if locker != nil { locker.Unlock() locker = nil } } defer unlockDiskQuota() ti.logger.Info("start checking disk quota", zap.String("disk-quota", units.BytesSize(float64(ti.diskQuota)))) for { select { case <-ctx.Done(): return case <-time.After(CheckDiskQuotaInterval): } largeEngines, inProgressLargeEngines, totalDiskSize, totalMemSize := local.CheckDiskQuota(ti.backend, ti.diskQuota) if len(largeEngines) == 0 && inProgressLargeEngines == 0 { unlockDiskQuota() continue } ti.logger.Warn("disk quota exceeded", zap.Int64("diskSize", totalDiskSize), zap.Int64("memSize", totalMemSize), zap.Int64("quota", ti.diskQuota), zap.Int("largeEnginesCount", len(largeEngines)), zap.Int("inProgressLargeEnginesCount", inProgressLargeEngines)) lockDiskQuota() if len(largeEngines) == 0 { ti.logger.Warn("all large engines are already importing, keep blocking all writes") continue } if err := ti.backend.FlushAllEngines(ctx); err != nil { ti.logger.Error("flush engine for disk quota failed, check again later", log.ShortError(err)) unlockDiskQuota() continue } // at this point, all engines are synchronized on disk. // we then import every large engines one by one and complete. // if any engine failed to import, we just try again next time, since the data are still intact. var importErr error for _, engine := range largeEngines { // Use a larger split region size to avoid split the same region by many times. if err := ti.backend.UnsafeImportAndReset( ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio), ); err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { err = local.ConvertToErrFoundConflictRecords(err, ti.encTable) } importErr = multierr.Append(importErr, err) } } if importErr != nil { // discuss: should we return the error and cancel the import? ti.logger.Error("import large engines failed, check again later", log.ShortError(importErr)) } unlockDiskQuota() } } // SetSelectedChunkCh sets the channel to receive selected rows. func (ti *TableImporter) SetSelectedChunkCh(ch chan QueryChunk) { ti.chunkCh = ch } func (ti *TableImporter) closeAndCleanupEngine(engine *backend.OpenedEngine) { // outer context might be done, so we create a new context here. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() closedEngine, err := engine.Close(ctx) if err != nil { ti.logger.Error("close engine failed", zap.Error(err)) return } if err = closedEngine.Cleanup(ctx); err != nil { ti.logger.Error("cleanup engine failed", zap.Error(err)) } } // ImportSelectedRows imports selected rows. func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.Context) (int64, error) { var ( err error dataEngine, indexEngine *backend.OpenedEngine ) metrics := tidbmetrics.GetRegisteredImportMetrics(promutil.NewDefaultFactory(), prometheus.Labels{ proto.TaskIDLabelName: ti.id, }) ctx = metric.WithCommonMetric(ctx, metrics) defer func() { tidbmetrics.UnregisterImportMetrics(metrics) if dataEngine != nil { ti.closeAndCleanupEngine(dataEngine) } if indexEngine != nil { ti.closeAndCleanupEngine(indexEngine) } }() dataEngine, err = ti.OpenDataEngine(ctx, 1) if err != nil { return 0, err } indexEngine, err = ti.OpenIndexEngine(ctx, common.IndexEngineID) if err != nil { return 0, err } var ( mu sync.Mutex checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) ) eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx) for range ti.ThreadCnt { eg.Go(func() error { chunkCheckpoint := checkpoints.ChunkCheckpoint{} chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) defer func() { mu.Lock() defer mu.Unlock() checksum.Add(chunkChecksum) }() return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum, nil) }) } if err = eg.Wait(); err != nil { return 0, err } closedDataEngine, err := dataEngine.Close(ctx) if err != nil { return 0, err } failpoint.Inject("mockImportFromSelectErr", func() { failpoint.Return(0, errors.New("mock import from select error")) }) if err = closedDataEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys); err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { err = local.ConvertToErrFoundConflictRecords(err, ti.encTable) } return 0, err } dataKVCount := ti.backend.GetImportedKVCount(closedDataEngine.GetUUID()) closedIndexEngine, err := indexEngine.Close(ctx) if err != nil { return 0, err } if err = closedIndexEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys); err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { err = local.ConvertToErrFoundConflictRecords(err, ti.encTable) } return 0, err } allocators := ti.Allocators() maxIDs := map[autoid.AllocatorType]int64{ autoid.RowIDAllocType: allocators.Get(autoid.RowIDAllocType).Base(), autoid.AutoIncrementType: allocators.Get(autoid.AutoIncrementType).Base(), autoid.AutoRandomType: allocators.Get(autoid.AutoRandomType).Base(), } if err = PostProcess(ctx, se, maxIDs, ti.Plan, checksum, ti.logger); err != nil { return 0, err } return dataKVCount, nil } func adjustDiskQuota(diskQuota int64, sortDir string, logger *zap.Logger) int64 { sz, err := common.GetStorageSize(sortDir) if err != nil { logger.Warn("failed to get storage size", zap.Error(err)) if diskQuota != 0 { return diskQuota } logger.Info("use default quota instead", zap.Int64("quota", int64(DefaultDiskQuota))) return int64(DefaultDiskQuota) } maxDiskQuota := int64(float64(sz.Capacity) * 0.8) switch { case diskQuota == 0: logger.Info("use 0.8 of the storage size as default disk quota", zap.String("quota", units.HumanSize(float64(maxDiskQuota)))) return maxDiskQuota case diskQuota > maxDiskQuota: logger.Warn("disk quota is larger than 0.8 of the storage size, use 0.8 of the storage size instead", zap.String("quota", units.HumanSize(float64(maxDiskQuota)))) return maxDiskQuota default: return diskQuota } } // PostProcess does the post-processing for the task. // exported for testing. func PostProcess( ctx context.Context, se sessionctx.Context, maxIDs map[autoid.AllocatorType]int64, plan *Plan, localChecksum *verify.KVGroupChecksum, logger *zap.Logger, ) (err error) { callLog := log.BeginTask(logger.With(zap.Object("checksum", localChecksum)), "post process") defer func() { callLog.End(zap.ErrorLevel, err) }() if err = RebaseAllocatorBases(ctx, se.GetStore(), maxIDs, plan, logger); err != nil { return err } return VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), logger, func() (*local.RemoteChecksum, error) { return RemoteChecksumTableBySQL(ctx, se, plan, logger) }, ) } type autoIDRequirement struct { store tidbkv.Storage autoidCli *autoid.ClientDiscover } func (r *autoIDRequirement) Store() tidbkv.Storage { return r.store } func (r *autoIDRequirement) AutoIDClient() *autoid.ClientDiscover { return r.autoidCli } // RebaseAllocatorBases rebase the allocator bases. func RebaseAllocatorBases(ctx context.Context, kvStore tidbkv.Storage, maxIDs map[autoid.AllocatorType]int64, plan *Plan, logger *zap.Logger) (err error) { callLog := log.BeginTask(logger.With(zap.Any("maxIDs", maxIDs)), "rebase allocators") defer func() { callLog.End(zap.ErrorLevel, err) }() if !common.TableHasAutoID(plan.DesiredTableInfo) { return nil } tidbCfg := tidb.GetGlobalConfig() hostPort := net.JoinHostPort("127.0.0.1", strconv.Itoa(int(tidbCfg.Status.StatusPort))) tls, err2 := common.NewTLS( tidbCfg.Security.ClusterSSLCA, tidbCfg.Security.ClusterSSLCert, tidbCfg.Security.ClusterSSLKey, hostPort, nil, nil, nil, ) if err2 != nil { return err2 } addrs := strings.Split(tidbCfg.Path, ",") etcdCli, err := clientv3.New(clientv3.Config{ Endpoints: addrs, AutoSyncInterval: 30 * time.Second, TLS: tls.TLSConfig(), }) if err != nil { return errors.Trace(err) } etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec())) autoidCli := autoid.NewClientDiscover(etcdCli) r := autoIDRequirement{store: kvStore, autoidCli: autoidCli} err = common.RebaseTableAllocators(ctx, maxIDs, &r, plan.DBID, plan.DesiredTableInfo) if err1 := etcdCli.Close(); err1 != nil { logger.Info("close etcd client error", zap.Error(err1)) } autoidCli.ResetConn(nil) return errors.Trace(err) } type remoteChecksumFunction func() (*local.RemoteChecksum, error) // VerifyChecksum verify the checksum of the table. func VerifyChecksum(ctx context.Context, plan *Plan, localChecksum verify.KVChecksum, logger *zap.Logger, getRemoteChecksumFn remoteChecksumFunction) error { if plan.Checksum == config.OpLevelOff { logger.Info("checksum turned off, skip", zap.Object("checksum", &localChecksum)) return nil } logger.Info("local checksum", zap.Object("checksum", &localChecksum)) failpoint.Inject("waitCtxDone", func() { <-ctx.Done() }) failpoint.Inject("retryableError", func() { failpoint.Return(common.ErrWriteTooSlow) }) remoteChecksum, err := getRemoteChecksumFn() if err != nil { if plan.Checksum != config.OpLevelOptional { return err } logger.Warn("get remote checksum failed, will skip this error and go on", zap.Error(err)) } if remoteChecksum != nil { if !remoteChecksum.IsEqual(&localChecksum) { err2 := common.ErrChecksumMismatch.GenWithStackByArgs( remoteChecksum.Checksum, localChecksum.Sum(), remoteChecksum.TotalKVs, localChecksum.SumKVS(), remoteChecksum.TotalBytes, localChecksum.SumSize(), ) if plan.Checksum == config.OpLevelOptional { logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2)) err2 = nil } return err2 } logger.Info("checksum pass", zap.Object("local", &localChecksum)) } return nil } // RemoteChecksumTableBySQL executes the SQL to get the remote checksum of the table. func RemoteChecksumTableBySQL(ctx context.Context, se sessionctx.Context, plan *Plan, logger *zap.Logger) (*local.RemoteChecksum, error) { var ( tableName = common.UniqueTable(plan.DBName, plan.TableInfo.Name.L) sql = "ADMIN CHECKSUM TABLE " + tableName maxErrorRetryCount = 3 distSQLScanConcurrencyFactor = 1 remoteChecksum *local.RemoteChecksum txnErr error doneCh = make(chan struct{}) ) checkCtx, cancel := context.WithCancel(ctx) defer func() { cancel() <-doneCh }() go func() { <-checkCtx.Done() se.GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted) close(doneCh) }() distSQLScanConcurrencyBak := se.GetSessionVars().DistSQLScanConcurrency() defer func() { se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrencyBak) }() ctx = util.WithInternalSourceType(checkCtx, tidbkv.InternalImportInto) for i := range maxErrorRetryCount { txnErr = func() error { // increase backoff weight backoffWeight := GetBackoffWeight(plan) logger.Info("set backoff weight", zap.Int("weight", backoffWeight)) err := se.GetSessionVars().SetSystemVar(vardef.TiDBBackOffWeight, strconv.Itoa(backoffWeight)) if err != nil { logger.Warn("set tidb_backoff_weight failed", zap.Error(err)) } newConcurrency := max(plan.DistSQLScanConcurrency/distSQLScanConcurrencyFactor, local.MinDistSQLScanConcurrency) logger.Info("checksum with adjusted distsql scan concurrency", zap.Int("concurrency", newConcurrency)) se.GetSessionVars().SetDistSQLScanConcurrency(newConcurrency) // TODO: add resource group name rs, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), sql) if err != nil { return err } if len(rs) < 1 { return errors.New("empty checksum result") } failpoint.Inject("errWhenChecksum", func() { failpoint.Return(errors.New("occur an error when checksum, coprocessor task terminated due to exceeding the deadline")) }) // ADMIN CHECKSUM TABLE . example. // mysql> admin checksum table test.t; // +---------+------------+---------------------+-----------+-------------+ // | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes | // +---------+------------+---------------------+-----------+-------------+ // | test | t | 8520875019404689597 | 7296873 | 357601387 | // +---------+------------+------------- remoteChecksum = &local.RemoteChecksum{ Schema: rs[0].GetString(0), Table: rs[0].GetString(1), Checksum: rs[0].GetUint64(2), TotalKVs: rs[0].GetUint64(3), TotalBytes: rs[0].GetUint64(4), } return nil }() if !common.IsRetryableError(txnErr) { break } distSQLScanConcurrencyFactor *= 2 logger.Warn("retry checksum table", zap.Int("retry count", i+1), zap.Error(txnErr)) } return remoteChecksum, txnErr } // GetBackoffWeight returns the backoff weight for the plan. // returns max(local.DefaultBackoffWeight, plan.ImportantSysVars[vardef.TiDBBackOffWeight]) func GetBackoffWeight(plan *Plan) int { backoffWeight := local.DefaultBackoffWeight if val, ok := plan.ImportantSysVars[vardef.TiDBBackOffWeight]; ok { if weight, err := strconv.Atoi(val); err == nil && weight > backoffWeight { backoffWeight = weight } } return backoffWeight } // GetImportRootDir returns the root directory for import. // The directory structure is like: // // -> /path/to/tidb-tmpdir // -> import-4000 // -> 1 // -> some-uuid // // exported for testing. func GetImportRootDir(tidbCfg *tidb.Config) string { sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port)) return filepath.Join(tidbCfg.TempDir, sortPathSuffix) } // FlushTableStats flushes the stats of the table. // stats will be stored in the stat collector, and be applied to to mysql.stats_meta // in the domain.UpdateTableStatsLoop with a random interval between [1, 2) minutes. // These stats will stay in memory until the next flush, so it might be lost if the tidb-server restarts. func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64, importedRows int64) error { if err := sessiontxn.NewTxn(ctx, se); err != nil { return err } exec := statshandle.AttachStatsCollector(se.GetSQLExecutor()) defer statshandle.DetachStatsCollector(exec) sessionVars := se.GetSessionVars() sessionVars.TxnCtxMu.Lock() defer sessionVars.TxnCtxMu.Unlock() sessionVars.TxnCtx.UpdateDeltaForTable(tableID, importedRows, importedRows) se.StmtCommit(ctx) return se.CommitTxn(ctx) }