diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 905a0fd4eb..1c954fc94f 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -383,6 +383,7 @@ func (kvcodec *tableKVEncoder) Encode( if kvcodec.isAutoRandomCol(col.ToInfo()) { shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) + // this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too. alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType) if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index dc40f6abad..124e8e9c88 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -207,34 +207,34 @@ func (tr *TableImporter) Close() { func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error { task := tr.logger.Begin(zap.InfoLevel, "load engines and files") - chunks, err := mydump.MakeTableRegions(ctx, tr.tableMeta, len(tr.tableInfo.Core.Columns), rc.cfg, rc.ioWorkers, rc.store) + tableRegions, err := mydump.MakeTableRegions(ctx, tr.tableMeta, len(tr.tableInfo.Core.Columns), rc.cfg, rc.ioWorkers, rc.store) if err == nil { timestamp := time.Now().Unix() failpoint.Inject("PopulateChunkTimestamp", func(v failpoint.Value) { timestamp = int64(v.(int)) }) - for _, chunk := range chunks { - engine, found := cp.Engines[chunk.EngineID] + for _, region := range tableRegions { + engine, found := cp.Engines[region.EngineID] if !found { engine = &checkpoints.EngineCheckpoint{ Status: checkpoints.CheckpointStatusLoaded, } - cp.Engines[chunk.EngineID] = engine + cp.Engines[region.EngineID] = engine } ccp := &checkpoints.ChunkCheckpoint{ Key: checkpoints.ChunkCheckpointKey{ - Path: chunk.FileMeta.Path, - Offset: chunk.Chunk.Offset, + Path: region.FileMeta.Path, + Offset: region.Chunk.Offset, }, - FileMeta: chunk.FileMeta, + FileMeta: region.FileMeta, ColumnPermutation: nil, - Chunk: chunk.Chunk, + Chunk: region.Chunk, Timestamp: timestamp, } - if len(chunk.Chunk.Columns) > 0 { + if len(region.Chunk.Columns) > 0 { perms, err := parseColumnPermutations( tr.tableInfo.Core, - chunk.Chunk.Columns, + region.Chunk.Columns, tr.ignoreColumns, log.FromContext(ctx)) if err != nil { @@ -250,7 +250,7 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp } task.End(zap.ErrorLevel, err, zap.Int("enginesCnt", len(cp.Engines)), - zap.Int("filesCnt", len(chunks)), + zap.Int("filesCnt", len(tableRegions)), ) return err } @@ -661,11 +661,6 @@ ChunkLoop: // 2. sql -> kvs // 3. load kvs data (into kv deliver server) // 4. flush kvs data (into tikv node) - cr, err := newChunkProcessor(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo) - if err != nil { - setError(err) - break - } var remainChunkCnt float64 if chunk.Chunk.Offset < chunk.Chunk.EndOffset { remainChunkCnt = float64(chunk.UnfinishedSize()) / float64(chunk.TotalSize()) @@ -676,7 +671,6 @@ ChunkLoop: dataWriter, err := dataEngine.LocalWriter(ctx, dataWriterCfg) if err != nil { - cr.close() setError(err) break } @@ -684,7 +678,11 @@ ChunkLoop: indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) if err != nil { _, _ = dataWriter.Close(ctx) - cr.close() + setError(err) + break + } + cr, err := newChunkProcessor(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo) + if err != nil { setError(err) break } diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index c99330a962..d55575900f 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -93,12 +93,18 @@ type ChunkParser struct { // Chunk represents a portion of the data file. type Chunk struct { - Offset int64 - EndOffset int64 - RealOffset int64 + Offset int64 + EndOffset int64 + RealOffset int64 + // we estimate row-id range of the chunk using file-size divided by some factor(depends on column count) + // after estimation, we will rebase them for all chunks of this table in this instance, + // then it's rebased again based on all instances of parallel import. + // allocatable row-id is in range [PrevRowIDMax, RowIDMax). + // PrevRowIDMax will be increased during local encoding PrevRowIDMax int64 RowIDMax int64 - Columns []string + // only assigned when using strict-mode for CSV files and the file contains header + Columns []string } // Row is the content of a row. diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index d57a2c4742..ac95e515c2 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -146,6 +146,7 @@ func AllocateEngineIDs( } // MakeTableRegions create a new table region. +// row-id range of returned TableRegion is increasing monotonically func MakeTableRegions( ctx context.Context, meta *MDTableMeta, @@ -226,21 +227,17 @@ func MakeTableRegions( filesRegions := make([]*TableRegion, 0, len(meta.DataFiles)) dataFileSizes := make([]float64, 0, len(meta.DataFiles)) - prevRowIDMax := int64(0) + // rebase row-id for all chunk + rowIDBase := int64(0) for _, dataFile := range meta.DataFiles { fileRegionsRes := fileRegionsMap[dataFile.FileMeta.Path] - var delta int64 - if len(fileRegionsRes.regions) > 0 { - delta = prevRowIDMax - fileRegionsRes.regions[0].Chunk.PrevRowIDMax - } - for _, region := range fileRegionsRes.regions { - region.Chunk.PrevRowIDMax += delta - region.Chunk.RowIDMax += delta + region.Chunk.PrevRowIDMax += rowIDBase + region.Chunk.RowIDMax += rowIDBase } filesRegions = append(filesRegions, fileRegionsRes.regions...) dataFileSizes = append(dataFileSizes, fileRegionsRes.sizes...) - prevRowIDMax = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax + rowIDBase = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax } batchSize := float64(cfg.Mydumper.BatchSize) @@ -272,7 +269,7 @@ func MakeSourceFileRegion( store storage.ExternalStorage, ) ([]*TableRegion, []float64, error) { if fi.FileMeta.Type == SourceTypeParquet { - _, region, err := makeParquetFileRegion(ctx, store, meta, fi, 0) + region, err := makeParquetFileRegion(ctx, store, meta, fi) if err != nil { return nil, nil, err } @@ -293,7 +290,7 @@ func MakeSourceFileRegion( // If a csv file is compressed, we can't split it now because we can't get the exact size of a row. if isCsvFile && cfg.Mydumper.StrictFormat && fi.FileMeta.Compression == CompressionNone && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { - _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) + regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, ioWorkers, store) return regions, subFileSizes, err } @@ -341,17 +338,15 @@ func makeParquetFileRegion( store storage.ExternalStorage, meta *MDTableMeta, dataFile FileInfo, - prevRowIdxMax int64, -) (int64, *TableRegion, error) { +) (*TableRegion, error) { r, err := store.Open(ctx, dataFile.FileMeta.Path) if err != nil { - return prevRowIdxMax, nil, errors.Trace(err) + return nil, errors.Trace(err) } numberRows, err := ReadParquetFileRowCount(ctx, store, r, dataFile.FileMeta.Path) if err != nil { - return 0, nil, errors.Trace(err) + return nil, errors.Trace(err) } - rowIDMax := prevRowIdxMax + numberRows region := &TableRegion{ DB: meta.DB, Table: meta.Name, @@ -359,11 +354,11 @@ func makeParquetFileRegion( Chunk: Chunk{ Offset: 0, EndOffset: numberRows, - PrevRowIDMax: prevRowIdxMax, - RowIDMax: rowIDMax, + PrevRowIDMax: 0, + RowIDMax: numberRows, }, } - return rowIDMax, region, nil + return region, nil } // SplitLargeFile splits a large csv file into multiple regions, the size of @@ -379,30 +374,30 @@ func SplitLargeFile( cfg *config.Config, dataFile FileInfo, divisor int64, - prevRowIdxMax int64, ioWorker *worker.Pool, store storage.ExternalStorage, -) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error) { +) (regions []*TableRegion, dataFileSizes []float64, err error) { maxRegionSize := int64(cfg.Mydumper.MaxRegionSize) dataFileSizes = make([]float64, 0, dataFile.FileMeta.FileSize/maxRegionSize+1) startOffset, endOffset := int64(0), maxRegionSize var columns []string + var prevRowIdxMax int64 if cfg.Mydumper.CSV.Header { r, err := store.Open(ctx, dataFile.FileMeta.Path) if err != nil { - return 0, nil, nil, err + return nil, nil, err } // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace) if err != nil { - return 0, nil, nil, err + return nil, nil, err } parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, true, charsetConvertor) if err != nil { - return 0, nil, nil, err + return nil, nil, err } if err = parser.ReadColumns(); err != nil { - return 0, nil, nil, err + return nil, nil, err } if cfg.Mydumper.CSV.HeaderSchemaMatch { columns = parser.Columns() @@ -419,24 +414,24 @@ func SplitLargeFile( if endOffset != dataFile.FileMeta.FileSize { r, err := store.Open(ctx, dataFile.FileMeta.Path) if err != nil { - return 0, nil, nil, err + return nil, nil, err } // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace) if err != nil { - return 0, nil, nil, err + return nil, nil, err } parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, false, charsetConvertor) if err != nil { - return 0, nil, nil, err + return nil, nil, err } - if err = parser.SetPos(endOffset, prevRowIDMax); err != nil { - return 0, nil, nil, err + if err = parser.SetPos(endOffset, 0); err != nil { + return nil, nil, err } _, pos, err := parser.ReadUntilTerminator() if err != nil { if !errors.ErrorEqual(err, io.EOF) { - return 0, nil, nil, err + return nil, nil, err } log.FromContext(ctx).Warn("file contains no terminator at end", zap.String("path", dataFile.FileMeta.Path), @@ -469,5 +464,5 @@ func SplitLargeFile( endOffset = dataFile.FileMeta.FileSize } } - return prevRowIdxMax, regions, dataFileSizes, nil + return regions, dataFileSizes, nil } diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index ad23590f4c..3f2718532f 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -319,13 +319,12 @@ func TestSplitLargeFile(t *testing.T) { {19, [][]int64{{6, 30}}}, } { cfg.Mydumper.MaxRegionSize = tc.maxRegionSize - prevRowIdxMax := int64(0) ioWorker := worker.NewPool(context.Background(), 4, "io") store, err := storage.NewLocalStorage(".") assert.NoError(t, err) - _, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store) + regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) assert.NoError(t, err) assert.Len(t, regions, len(tc.offsets)) for i := range tc.offsets { @@ -375,7 +374,6 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) { fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} colCnt := int64(2) columns := []string{"a", "b"} - prevRowIdxMax := int64(0) ioWorker := worker.NewPool(context.Background(), 4, "io") store, err := storage.NewLocalStorage(dir) @@ -383,7 +381,7 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) { offsets := [][]int64{{4, 13}, {13, 21}} - _, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store) + regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) require.NoError(t, err) require.Len(t, regions, len(offsets)) for i := range offsets { @@ -425,7 +423,6 @@ func TestSplitLargeFileWithCustomTerminator(t *testing.T) { fileSize := dataFileInfo.Size() fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} colCnt := int64(3) - prevRowIdxMax := int64(0) ioWorker := worker.NewPool(context.Background(), 4, "io") store, err := storage.NewLocalStorage(dir) @@ -433,7 +430,7 @@ func TestSplitLargeFileWithCustomTerminator(t *testing.T) { offsets := [][]int64{{0, 23}, {23, 38}, {38, 47}} - _, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store) + regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) require.NoError(t, err) require.Len(t, regions, len(offsets)) for i := range offsets { @@ -481,7 +478,6 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} colCnt := int64(2) columns := []string{"field1", "field2"} - prevRowIdxMax := int64(0) ioWorker := worker.NewPool(context.Background(), 4, "io") store, err := storage.NewLocalStorage(dir) @@ -489,7 +485,7 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { offsets := [][]int64{{14, 24}} - _, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store) + regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store) require.NoError(t, err) require.Len(t, regions, len(offsets)) for i := range offsets {