lightning: some refactor about id allocation (#41994)
ref pingcap/tidb#40499
This commit is contained in:
@ -383,6 +383,7 @@ func (kvcodec *tableKVEncoder) Encode(
|
||||
|
||||
if kvcodec.isAutoRandomCol(col.ToInfo()) {
|
||||
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
|
||||
// this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too.
|
||||
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
|
||||
if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
|
||||
@ -207,34 +207,34 @@ func (tr *TableImporter) Close() {
|
||||
|
||||
func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error {
|
||||
task := tr.logger.Begin(zap.InfoLevel, "load engines and files")
|
||||
chunks, err := mydump.MakeTableRegions(ctx, tr.tableMeta, len(tr.tableInfo.Core.Columns), rc.cfg, rc.ioWorkers, rc.store)
|
||||
tableRegions, err := mydump.MakeTableRegions(ctx, tr.tableMeta, len(tr.tableInfo.Core.Columns), rc.cfg, rc.ioWorkers, rc.store)
|
||||
if err == nil {
|
||||
timestamp := time.Now().Unix()
|
||||
failpoint.Inject("PopulateChunkTimestamp", func(v failpoint.Value) {
|
||||
timestamp = int64(v.(int))
|
||||
})
|
||||
for _, chunk := range chunks {
|
||||
engine, found := cp.Engines[chunk.EngineID]
|
||||
for _, region := range tableRegions {
|
||||
engine, found := cp.Engines[region.EngineID]
|
||||
if !found {
|
||||
engine = &checkpoints.EngineCheckpoint{
|
||||
Status: checkpoints.CheckpointStatusLoaded,
|
||||
}
|
||||
cp.Engines[chunk.EngineID] = engine
|
||||
cp.Engines[region.EngineID] = engine
|
||||
}
|
||||
ccp := &checkpoints.ChunkCheckpoint{
|
||||
Key: checkpoints.ChunkCheckpointKey{
|
||||
Path: chunk.FileMeta.Path,
|
||||
Offset: chunk.Chunk.Offset,
|
||||
Path: region.FileMeta.Path,
|
||||
Offset: region.Chunk.Offset,
|
||||
},
|
||||
FileMeta: chunk.FileMeta,
|
||||
FileMeta: region.FileMeta,
|
||||
ColumnPermutation: nil,
|
||||
Chunk: chunk.Chunk,
|
||||
Chunk: region.Chunk,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
if len(chunk.Chunk.Columns) > 0 {
|
||||
if len(region.Chunk.Columns) > 0 {
|
||||
perms, err := parseColumnPermutations(
|
||||
tr.tableInfo.Core,
|
||||
chunk.Chunk.Columns,
|
||||
region.Chunk.Columns,
|
||||
tr.ignoreColumns,
|
||||
log.FromContext(ctx))
|
||||
if err != nil {
|
||||
@ -250,7 +250,7 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp
|
||||
}
|
||||
task.End(zap.ErrorLevel, err,
|
||||
zap.Int("enginesCnt", len(cp.Engines)),
|
||||
zap.Int("filesCnt", len(chunks)),
|
||||
zap.Int("filesCnt", len(tableRegions)),
|
||||
)
|
||||
return err
|
||||
}
|
||||
@ -661,11 +661,6 @@ ChunkLoop:
|
||||
// 2. sql -> kvs
|
||||
// 3. load kvs data (into kv deliver server)
|
||||
// 4. flush kvs data (into tikv node)
|
||||
cr, err := newChunkProcessor(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo)
|
||||
if err != nil {
|
||||
setError(err)
|
||||
break
|
||||
}
|
||||
var remainChunkCnt float64
|
||||
if chunk.Chunk.Offset < chunk.Chunk.EndOffset {
|
||||
remainChunkCnt = float64(chunk.UnfinishedSize()) / float64(chunk.TotalSize())
|
||||
@ -676,7 +671,6 @@ ChunkLoop:
|
||||
|
||||
dataWriter, err := dataEngine.LocalWriter(ctx, dataWriterCfg)
|
||||
if err != nil {
|
||||
cr.close()
|
||||
setError(err)
|
||||
break
|
||||
}
|
||||
@ -684,7 +678,11 @@ ChunkLoop:
|
||||
indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{})
|
||||
if err != nil {
|
||||
_, _ = dataWriter.Close(ctx)
|
||||
cr.close()
|
||||
setError(err)
|
||||
break
|
||||
}
|
||||
cr, err := newChunkProcessor(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo)
|
||||
if err != nil {
|
||||
setError(err)
|
||||
break
|
||||
}
|
||||
|
||||
@ -93,12 +93,18 @@ type ChunkParser struct {
|
||||
|
||||
// Chunk represents a portion of the data file.
|
||||
type Chunk struct {
|
||||
Offset int64
|
||||
EndOffset int64
|
||||
RealOffset int64
|
||||
Offset int64
|
||||
EndOffset int64
|
||||
RealOffset int64
|
||||
// we estimate row-id range of the chunk using file-size divided by some factor(depends on column count)
|
||||
// after estimation, we will rebase them for all chunks of this table in this instance,
|
||||
// then it's rebased again based on all instances of parallel import.
|
||||
// allocatable row-id is in range [PrevRowIDMax, RowIDMax).
|
||||
// PrevRowIDMax will be increased during local encoding
|
||||
PrevRowIDMax int64
|
||||
RowIDMax int64
|
||||
Columns []string
|
||||
// only assigned when using strict-mode for CSV files and the file contains header
|
||||
Columns []string
|
||||
}
|
||||
|
||||
// Row is the content of a row.
|
||||
|
||||
@ -146,6 +146,7 @@ func AllocateEngineIDs(
|
||||
}
|
||||
|
||||
// MakeTableRegions create a new table region.
|
||||
// row-id range of returned TableRegion is increasing monotonically
|
||||
func MakeTableRegions(
|
||||
ctx context.Context,
|
||||
meta *MDTableMeta,
|
||||
@ -226,21 +227,17 @@ func MakeTableRegions(
|
||||
|
||||
filesRegions := make([]*TableRegion, 0, len(meta.DataFiles))
|
||||
dataFileSizes := make([]float64, 0, len(meta.DataFiles))
|
||||
prevRowIDMax := int64(0)
|
||||
// rebase row-id for all chunk
|
||||
rowIDBase := int64(0)
|
||||
for _, dataFile := range meta.DataFiles {
|
||||
fileRegionsRes := fileRegionsMap[dataFile.FileMeta.Path]
|
||||
var delta int64
|
||||
if len(fileRegionsRes.regions) > 0 {
|
||||
delta = prevRowIDMax - fileRegionsRes.regions[0].Chunk.PrevRowIDMax
|
||||
}
|
||||
|
||||
for _, region := range fileRegionsRes.regions {
|
||||
region.Chunk.PrevRowIDMax += delta
|
||||
region.Chunk.RowIDMax += delta
|
||||
region.Chunk.PrevRowIDMax += rowIDBase
|
||||
region.Chunk.RowIDMax += rowIDBase
|
||||
}
|
||||
filesRegions = append(filesRegions, fileRegionsRes.regions...)
|
||||
dataFileSizes = append(dataFileSizes, fileRegionsRes.sizes...)
|
||||
prevRowIDMax = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax
|
||||
rowIDBase = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax
|
||||
}
|
||||
|
||||
batchSize := float64(cfg.Mydumper.BatchSize)
|
||||
@ -272,7 +269,7 @@ func MakeSourceFileRegion(
|
||||
store storage.ExternalStorage,
|
||||
) ([]*TableRegion, []float64, error) {
|
||||
if fi.FileMeta.Type == SourceTypeParquet {
|
||||
_, region, err := makeParquetFileRegion(ctx, store, meta, fi, 0)
|
||||
region, err := makeParquetFileRegion(ctx, store, meta, fi)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -293,7 +290,7 @@ func MakeSourceFileRegion(
|
||||
// If a csv file is compressed, we can't split it now because we can't get the exact size of a row.
|
||||
if isCsvFile && cfg.Mydumper.StrictFormat && fi.FileMeta.Compression == CompressionNone &&
|
||||
dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
|
||||
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
|
||||
regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, ioWorkers, store)
|
||||
return regions, subFileSizes, err
|
||||
}
|
||||
|
||||
@ -341,17 +338,15 @@ func makeParquetFileRegion(
|
||||
store storage.ExternalStorage,
|
||||
meta *MDTableMeta,
|
||||
dataFile FileInfo,
|
||||
prevRowIdxMax int64,
|
||||
) (int64, *TableRegion, error) {
|
||||
) (*TableRegion, error) {
|
||||
r, err := store.Open(ctx, dataFile.FileMeta.Path)
|
||||
if err != nil {
|
||||
return prevRowIdxMax, nil, errors.Trace(err)
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
numberRows, err := ReadParquetFileRowCount(ctx, store, r, dataFile.FileMeta.Path)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Trace(err)
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
rowIDMax := prevRowIdxMax + numberRows
|
||||
region := &TableRegion{
|
||||
DB: meta.DB,
|
||||
Table: meta.Name,
|
||||
@ -359,11 +354,11 @@ func makeParquetFileRegion(
|
||||
Chunk: Chunk{
|
||||
Offset: 0,
|
||||
EndOffset: numberRows,
|
||||
PrevRowIDMax: prevRowIdxMax,
|
||||
RowIDMax: rowIDMax,
|
||||
PrevRowIDMax: 0,
|
||||
RowIDMax: numberRows,
|
||||
},
|
||||
}
|
||||
return rowIDMax, region, nil
|
||||
return region, nil
|
||||
}
|
||||
|
||||
// SplitLargeFile splits a large csv file into multiple regions, the size of
|
||||
@ -379,30 +374,30 @@ func SplitLargeFile(
|
||||
cfg *config.Config,
|
||||
dataFile FileInfo,
|
||||
divisor int64,
|
||||
prevRowIdxMax int64,
|
||||
ioWorker *worker.Pool,
|
||||
store storage.ExternalStorage,
|
||||
) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error) {
|
||||
) (regions []*TableRegion, dataFileSizes []float64, err error) {
|
||||
maxRegionSize := int64(cfg.Mydumper.MaxRegionSize)
|
||||
dataFileSizes = make([]float64, 0, dataFile.FileMeta.FileSize/maxRegionSize+1)
|
||||
startOffset, endOffset := int64(0), maxRegionSize
|
||||
var columns []string
|
||||
var prevRowIdxMax int64
|
||||
if cfg.Mydumper.CSV.Header {
|
||||
r, err := store.Open(ctx, dataFile.FileMeta.Path)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
|
||||
charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, true, charsetConvertor)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if err = parser.ReadColumns(); err != nil {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if cfg.Mydumper.CSV.HeaderSchemaMatch {
|
||||
columns = parser.Columns()
|
||||
@ -419,24 +414,24 @@ func SplitLargeFile(
|
||||
if endOffset != dataFile.FileMeta.FileSize {
|
||||
r, err := store.Open(ctx, dataFile.FileMeta.Path)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
|
||||
charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, false, charsetConvertor)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if err = parser.SetPos(endOffset, prevRowIDMax); err != nil {
|
||||
return 0, nil, nil, err
|
||||
if err = parser.SetPos(endOffset, 0); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
_, pos, err := parser.ReadUntilTerminator()
|
||||
if err != nil {
|
||||
if !errors.ErrorEqual(err, io.EOF) {
|
||||
return 0, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
log.FromContext(ctx).Warn("file contains no terminator at end",
|
||||
zap.String("path", dataFile.FileMeta.Path),
|
||||
@ -469,5 +464,5 @@ func SplitLargeFile(
|
||||
endOffset = dataFile.FileMeta.FileSize
|
||||
}
|
||||
}
|
||||
return prevRowIdxMax, regions, dataFileSizes, nil
|
||||
return regions, dataFileSizes, nil
|
||||
}
|
||||
|
||||
@ -319,13 +319,12 @@ func TestSplitLargeFile(t *testing.T) {
|
||||
{19, [][]int64{{6, 30}}},
|
||||
} {
|
||||
cfg.Mydumper.MaxRegionSize = tc.maxRegionSize
|
||||
prevRowIdxMax := int64(0)
|
||||
ioWorker := worker.NewPool(context.Background(), 4, "io")
|
||||
|
||||
store, err := storage.NewLocalStorage(".")
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
|
||||
regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, regions, len(tc.offsets))
|
||||
for i := range tc.offsets {
|
||||
@ -375,7 +374,6 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) {
|
||||
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
|
||||
colCnt := int64(2)
|
||||
columns := []string{"a", "b"}
|
||||
prevRowIdxMax := int64(0)
|
||||
ioWorker := worker.NewPool(context.Background(), 4, "io")
|
||||
|
||||
store, err := storage.NewLocalStorage(dir)
|
||||
@ -383,7 +381,7 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) {
|
||||
|
||||
offsets := [][]int64{{4, 13}, {13, 21}}
|
||||
|
||||
_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
|
||||
regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, regions, len(offsets))
|
||||
for i := range offsets {
|
||||
@ -425,7 +423,6 @@ func TestSplitLargeFileWithCustomTerminator(t *testing.T) {
|
||||
fileSize := dataFileInfo.Size()
|
||||
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
|
||||
colCnt := int64(3)
|
||||
prevRowIdxMax := int64(0)
|
||||
ioWorker := worker.NewPool(context.Background(), 4, "io")
|
||||
|
||||
store, err := storage.NewLocalStorage(dir)
|
||||
@ -433,7 +430,7 @@ func TestSplitLargeFileWithCustomTerminator(t *testing.T) {
|
||||
|
||||
offsets := [][]int64{{0, 23}, {23, 38}, {38, 47}}
|
||||
|
||||
_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
|
||||
regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, regions, len(offsets))
|
||||
for i := range offsets {
|
||||
@ -481,7 +478,6 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
|
||||
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
|
||||
colCnt := int64(2)
|
||||
columns := []string{"field1", "field2"}
|
||||
prevRowIdxMax := int64(0)
|
||||
ioWorker := worker.NewPool(context.Background(), 4, "io")
|
||||
|
||||
store, err := storage.NewLocalStorage(dir)
|
||||
@ -489,7 +485,7 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
|
||||
|
||||
offsets := [][]int64{{14, 24}}
|
||||
|
||||
_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
|
||||
regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, ioWorker, store)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, regions, len(offsets))
|
||||
for i := range offsets {
|
||||
|
||||
Reference in New Issue
Block a user