diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 16288eaa44..f12766c5af 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -104,7 +104,7 @@ func (j *regionJob) convertStageTo(stage jobStageTp) { j.engine.importedKVSize.Add(j.writeResult.rangeStats.totalBytes) j.engine.importedKVCount.Add(j.writeResult.rangeStats.count) if j.metrics != nil { - j.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported). + j.metrics.BytesCounter.WithLabelValues(metric.StateImported). Add(float64(j.writeResult.rangeStats.totalBytes)) } } diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel index b6048a1bf6..7209638227 100644 --- a/br/pkg/lightning/importer/BUILD.bazel +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_tipb//go-tipb", + "@com_github_prometheus_client_golang//prometheus", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index 7e69447e5e..8624bf6a6f 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -256,20 +256,24 @@ func (cr *chunkProcessor) encodeLoop( canDeliver := false kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt) curOffset := offset - var newOffset, rowID, realOffset int64 + var newOffset, rowID, newScannedOffset int64 + var scannedOffset int64 = -1 var kvSize uint64 - var realOffsetErr error + var scannedOffsetErr error outLoop: for !canDeliver { readDurStart := time.Now() err = cr.parser.ReadRow() columnNames := cr.parser.Columns() newOffset, rowID = cr.parser.Pos() - if cr.chunk.FileMeta.Compression != mydump.CompressionNone { - realOffset, realOffsetErr = cr.parser.RealPos() - if realOffsetErr != nil { - logger.Warn("fail to get data engine RealPos, progress may not be accurate", - log.ShortError(realOffsetErr), zap.String("file", cr.chunk.FileMeta.Path)) + if cr.chunk.FileMeta.Compression != mydump.CompressionNone || cr.chunk.FileMeta.Type == mydump.SourceTypeParquet { + newScannedOffset, scannedOffsetErr = cr.parser.ScannedPos() + if scannedOffsetErr != nil { + logger.Warn("fail to get data engine ScannedPos, progress may not be accurate", + log.ShortError(scannedOffsetErr), zap.String("file", cr.chunk.FileMeta.Path)) + } + if scannedOffset == -1 { + scannedOffset = newScannedOffset } } @@ -334,7 +338,7 @@ func (cr *chunkProcessor) encodeLoop( } kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, - rowID: rowID, realOffset: realOffset}) + rowID: rowID, realOffset: newScannedOffset}) kvSize += kvs.Size() failpoint.Inject("mock-kv-size", func(val failpoint.Value) { kvSize += uint64(val.(int)) @@ -352,7 +356,11 @@ func (cr *chunkProcessor) encodeLoop( if m, ok := metric.FromContext(ctx); ok { m.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds()) m.RowReadSecondsHistogram.Observe(readDur.Seconds()) - m.RowReadBytesHistogram.Observe(float64(newOffset - offset)) + if cr.chunk.FileMeta.Type == mydump.SourceTypeParquet { + m.RowReadBytesHistogram.Observe(float64(newScannedOffset - scannedOffset)) + } else { + m.RowReadBytesHistogram.Observe(float64(newOffset - offset)) + } } if len(kvPacket) != 0 { @@ -514,11 +522,18 @@ func (cr *chunkProcessor) deliverLoop( } delta := highOffset - lowOffset if delta >= 0 { - m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta)) + if cr.chunk.FileMeta.Type == mydump.SourceTypeParquet { + if currRealOffset > startRealOffset { + m.BytesCounter.WithLabelValues(metric.StateRestored).Add(float64(currRealOffset - startRealOffset)) + } + m.RowsCounter.WithLabelValues(metric.StateRestored, t.tableName).Add(float64(delta)) + } else { + m.BytesCounter.WithLabelValues(metric.StateRestored).Add(float64(delta)) + m.RowsCounter.WithLabelValues(metric.StateRestored, t.tableName).Add(float64(dataChecksum.SumKVS())) + } if rc.status != nil && rc.status.backend == config.BackendTiDB { rc.status.FinishedFileSize.Add(delta) } - m.RowsCounter.WithLabelValues(t.tableName).Add(float64(dataChecksum.SumKVS())) } else { deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset), zap.Int64("start", lowOffset)) diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index c577c742d4..fdf064ca16 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -64,6 +64,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" regexprrouter "github.com/pingcap/tidb/util/regexpr-router" "github.com/pingcap/tidb/util/set" + "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "go.uber.org/atomic" "go.uber.org/multierr" @@ -959,14 +960,10 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error { estimatedChunkCount := 0.0 estimatedEngineCnt := int64(0) - batchSize := rc.cfg.Mydumper.BatchSize - if batchSize <= 0 { - // if rows in source files are not sorted by primary key(if primary is number or cluster index enabled), - // the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it. - batchSize = config.DefaultBatchSize - } for _, dbMeta := range rc.dbMetas { for _, tableMeta := range dbMeta.Tables { + batchSize := mydump.CalculateBatchSize(float64(rc.cfg.Mydumper.BatchSize), + tableMeta.IsRowOrdered, float64(tableMeta.TotalSize)) tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name) dbCp, err := rc.checkpointsDB.Get(ctx, tableName) if err != nil { @@ -1244,8 +1241,10 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s } // log the current progress periodically, so OPS will know that we're still working nanoseconds := float64(time.Since(start).Nanoseconds()) - totalRestoreBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore)) - restoredBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateRestored)) + totalRestoreBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateTotalRestore)) + restoredBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateRestored)) + totalRowsToRestore := metric.ReadAllCounters(metrics.RowsCounter.MetricVec, prometheus.Labels{"state": metric.StateTotalRestore}) + restoredRows := metric.ReadAllCounters(metrics.RowsCounter.MetricVec, prometheus.Labels{"state": metric.StateRestored}) // the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate // before the last table start, so use the bigger of the two should be a workaround estimated := metric.ReadCounter(metrics.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated)) @@ -1263,8 +1262,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s engineEstimated = enginePending } engineFinished := metric.ReadCounter(metrics.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess)) - bytesWritten := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten)) - bytesImported := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateImported)) + bytesWritten := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateRestoreWritten)) + bytesImported := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateImported)) var state string var remaining zap.Field @@ -1295,10 +1294,20 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s // total progress. restoreBytesField := zap.Skip() importBytesField := zap.Skip() + restoreRowsField := zap.Skip() remaining = zap.Skip() totalPercent := 0.0 - if restoredBytes > 0 { - restorePercent := math.Min(restoredBytes/totalRestoreBytes, 1.0) + if restoredBytes > 0 || restoredRows > 0 { + var restorePercent float64 + if totalRowsToRestore > 0 { + restorePercent = math.Min(restoredRows/totalRowsToRestore, 1.0) + restoreRowsField = zap.String("restore-rows", fmt.Sprintf("%.0f/%.0f", + restoredRows, totalRowsToRestore)) + } else { + restorePercent = math.Min(restoredBytes/totalRestoreBytes, 1.0) + restoreRowsField = zap.String("restore-rows", fmt.Sprintf("%.0f/%.0f(estimated)", + restoredRows, restoredRows/restorePercent)) + } metrics.ProgressGauge.WithLabelValues(metric.ProgressPhaseRestore).Set(restorePercent) if rc.cfg.TikvImporter.Backend != config.BackendTiDB { var importPercent float64 @@ -1348,7 +1357,7 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))), zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))), zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))), - restoreBytesField, importBytesField, + restoreBytesField, restoreRowsField, importBytesField, encodeSpeedField, zap.String("state", state), remaining, @@ -1668,13 +1677,33 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { allTasks = append(allTasks, task{tr: tr, cp: cp}) if len(cp.Engines) == 0 { - for _, fi := range tableMeta.DataFiles { + for i, fi := range tableMeta.DataFiles { totalDataSizeToRestore += fi.FileMeta.FileSize + if fi.FileMeta.Type == mydump.SourceTypeParquet { + numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta) + if err != nil { + return errors.Trace(err) + } + if m, ok := metric.FromContext(ctx); ok { + m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows)) + } + fi.FileMeta.Rows = numberRows + tableMeta.DataFiles[i] = fi + } } } else { for _, eng := range cp.Engines { for _, chunk := range eng.Chunks { - totalDataSizeToRestore += chunk.UnfinishedSize() + // for parquet files filesize is more accurate, we can calculate correct unfinished bytes unless + // we set up the reader, so we directly use filesize here + if chunk.FileMeta.Type == mydump.SourceTypeParquet { + totalDataSizeToRestore += chunk.FileMeta.FileSize + if m, ok := metric.FromContext(ctx); ok { + m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(chunk.UnfinishedSize())) + } + } else { + totalDataSizeToRestore += chunk.UnfinishedSize() + } } } } @@ -1682,7 +1711,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { } if m, ok := metric.FromContext(ctx); ok { - m.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore).Add(float64(totalDataSizeToRestore)) + m.BytesCounter.WithLabelValues(metric.StateTotalRestore).Add(float64(totalDataSizeToRestore)) } for i := range allTasks { diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 4a1916496e..14f74ffb3e 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -717,7 +717,7 @@ ChunkLoop: if err == nil { if metrics != nil { metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) - metrics.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize())) + metrics.BytesCounter.WithLabelValues(metric.StateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize())) } if dataFlushStatus != nil && indexFlushStaus != nil { if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() { @@ -751,14 +751,18 @@ ChunkLoop: // Report some statistics into the log for debugging. totalKVSize := uint64(0) totalSQLSize := int64(0) + logKeyName := "read(bytes)" for _, chunk := range cp.Chunks { totalKVSize += chunk.Checksum.SumSize() totalSQLSize += chunk.UnfinishedSize() + if chunk.FileMeta.Type == mydump.SourceTypeParquet { + logKeyName = "read(rows)" + } } err = chunkErr.Get() logTask.End(zap.ErrorLevel, err, - zap.Int64("read", totalSQLSize), + zap.Int64(logKeyName, totalSQLSize), zap.Uint64("written", totalKVSize), ) @@ -1238,7 +1242,7 @@ func (tr *TableImporter) addIndexes(ctx context.Context, db *sql.DB) (retErr err var totalRows int if m, ok := metric.FromContext(ctx); ok { - totalRows = int(metric.ReadCounter(m.RowsCounter.WithLabelValues(tr.tableName))) + totalRows = int(metric.ReadCounter(m.RowsCounter.WithLabelValues(metric.StateRestored, tableName))) } // Try to add all indexes in one statement. diff --git a/br/pkg/lightning/metric/metric.go b/br/pkg/lightning/metric/metric.go index 484dbfdca3..fbe503717e 100644 --- a/br/pkg/lightning/metric/metric.go +++ b/br/pkg/lightning/metric/metric.go @@ -29,10 +29,10 @@ const ( TableStateImported = "imported" TableStateCompleted = "completed" - BytesStateTotalRestore = "total_restore" // total source data bytes needs to restore - BytesStateRestored = "restored" // source data bytes restored during restore engine - BytesStateRestoreWritten = "written" // bytes written during restore engine - BytesStateImported = "imported" // bytes imported during import engine + StateTotalRestore = "total_restore" // total source data bytes needs to restore + StateRestored = "restored" // source data bytes restored during restore engine + StateRestoreWritten = "written" // bytes written during restore engine + StateImported = "imported" // bytes imported during import engine ProgressPhaseTotal = "total" // total restore progress(not include post-process, like checksum and analyze) ProgressPhaseRestore = "restore" // restore engine progress @@ -134,12 +134,13 @@ func NewMetrics(factory promutil.Factory) *Metrics { // - running // - finished // - failed + RowsCounter: factory.NewCounterVec( prometheus.CounterOpts{ Namespace: "lightning", Name: "rows", Help: "count of total rows", - }, []string{"table"}), + }, []string{"state", "table"}), ImportSecondsHistogram: factory.NewHistogram( prometheus.HistogramOpts{ @@ -322,6 +323,37 @@ func ReadCounter(counter prometheus.Counter) float64 { return metric.Counter.GetValue() } +func metricHasLabel(labelPairs []*dto.LabelPair, labels prometheus.Labels) bool { + for _, label := range labelPairs { + if v, ok := labels[label.GetName()]; ok && v == label.GetValue() { + return true + } + } + return false +} + +// ReadAllCounters reports the summary value of the counters with given labels. +func ReadAllCounters(metricsVec *prometheus.MetricVec, labels prometheus.Labels) float64 { + metricsChan := make(chan prometheus.Metric, 8) + go func() { + metricsVec.Collect(metricsChan) + close(metricsChan) + }() + + var sum float64 + for counter := range metricsChan { + var metric dto.Metric + if err := counter.Write(&metric); err != nil { + return math.NaN() + } + if !metricHasLabel(metric.GetLabel(), labels) { + continue + } + sum += metric.Counter.GetValue() + } + return sum +} + // ReadHistogramSum reports the sum of all observed values in the histogram. func ReadHistogramSum(histogram prometheus.Histogram) float64 { var metric dto.Metric diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index e70e267a5a..d670d9bfcb 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -89,6 +89,7 @@ type SourceFileMeta struct { // WARNING: variables below are not persistent ExtendData ExtendColumnData RealSize int64 + Rows int64 // only for parquet } // NewMDTableMeta creates an Mydumper table meta with specified character set. diff --git a/br/pkg/lightning/mydump/parquet_parser.go b/br/pkg/lightning/mydump/parquet_parser.go index a1b612903c..8a597aff8a 100644 --- a/br/pkg/lightning/mydump/parquet_parser.go +++ b/br/pkg/lightning/mydump/parquet_parser.go @@ -47,6 +47,8 @@ type ParquetParser struct { curIndex int lastRow Row logger log.Logger + + readSeekCloser ReadSeekCloser } // readerWrapper is a used for implement `source.ParquetFile` @@ -144,9 +146,9 @@ func OpenParquetReader( }, nil } -// ReadParquetFileRowCount reads the parquet file row count. +// readParquetFileRowCount reads the parquet file row count. // It is a special func to fetch parquet file row count fast. -func ReadParquetFileRowCount( +func readParquetFileRowCount( ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, @@ -172,6 +174,23 @@ func ReadParquetFileRowCount( return numRows, nil } +// ReadParquetFileRowCountByFile reads the parquet file row count through fileMeta. +func ReadParquetFileRowCountByFile( + ctx context.Context, + store storage.ExternalStorage, + fileMeta SourceFileMeta, +) (int64, error) { + r, err := store.Open(ctx, fileMeta.Path) + if err != nil { + return 0, errors.Trace(err) + } + numberRows, err := readParquetFileRowCount(ctx, store, r, fileMeta.Path) + if err != nil { + return 0, errors.Trace(err) + } + return numberRows, nil +} + // NewParquetParser generates a parquet parser. func NewParquetParser( ctx context.Context, @@ -216,10 +235,11 @@ func NewParquetParser( } return &ParquetParser{ - Reader: reader, - columns: columns, - columnMetas: columnMetas, - logger: log.FromContext(ctx), + Reader: reader, + columns: columns, + columnMetas: columnMetas, + logger: log.FromContext(ctx), + readSeekCloser: wrapper, }, nil } @@ -351,10 +371,10 @@ func (pp *ParquetParser) SetPos(pos int64, rowID int64) error { return nil } -// RealPos implements the Parser interface. -// For parquet it's equal to Pos(). -func (pp *ParquetParser) RealPos() (int64, error) { - return pp.curStart + int64(pp.curIndex), nil +// ScannedPos implements the Parser interface. +// For parquet it's parquet file's reader current position. +func (pp *ParquetParser) ScannedPos() (int64, error) { + return pp.readSeekCloser.Seek(0, io.SeekCurrent) } // Close closes the parquet file of the parser. diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index d55575900f..376cc7115c 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -132,9 +132,15 @@ const ( // Parser provides some methods to parse a source data file. type Parser interface { + // Pos returns means the position that parser have already handled. It's mainly used for checkpoint. + // For normal files it's the file offset we handled. + // For parquet files it's the row count we handled. + // For compressed files it's the uncompressed file offset we handled. + // TODO: replace pos with a new structure to specify position offset and rows offset Pos() (pos int64, rowID int64) SetPos(pos int64, rowID int64) error - RealPos() (int64, error) + // ScannedPos always returns the current file reader pointer's location + ScannedPos() (int64, error) Close() error ReadRow() error LastRow() Row @@ -184,8 +190,8 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error { return nil } -// RealPos gets the read position of current reader. -func (parser *blockParser) RealPos() (int64, error) { +// ScannedPos gets the read position of current reader. +func (parser *blockParser) ScannedPos() (int64, error) { return parser.reader.Seek(0, io.SeekCurrent) } diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index ac95e515c2..779a862959 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -240,14 +240,7 @@ func MakeTableRegions( rowIDBase = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax } - batchSize := float64(cfg.Mydumper.BatchSize) - if cfg.Mydumper.BatchSize <= 0 { - if meta.IsRowOrdered { - batchSize = float64(config.DefaultBatchSize) - } else { - batchSize = math.Max(float64(config.DefaultBatchSize), float64(meta.TotalSize)) - } - } + batchSize := CalculateBatchSize(float64(cfg.Mydumper.BatchSize), meta.IsRowOrdered, float64(meta.TotalSize)) log.FromContext(ctx).Info("makeTableRegions", zap.Int("filesCount", len(meta.DataFiles)), zap.Int64("MaxRegionSize", int64(cfg.Mydumper.MaxRegionSize)), @@ -258,6 +251,19 @@ func MakeTableRegions( return filesRegions, nil } +// CalculateBatchSize calculates batch size according to row order and file size. +func CalculateBatchSize(mydumperBatchSize float64, isRowOrdered bool, totalSize float64) float64 { + batchSize := mydumperBatchSize + if batchSize <= 0 { + if isRowOrdered { + batchSize = float64(config.DefaultBatchSize) + } else { + batchSize = math.Max(float64(config.DefaultBatchSize), totalSize) + } + } + return batchSize +} + // MakeSourceFileRegion create a new source file region. func MakeSourceFileRegion( ctx context.Context, @@ -339,13 +345,14 @@ func makeParquetFileRegion( meta *MDTableMeta, dataFile FileInfo, ) (*TableRegion, error) { - r, err := store.Open(ctx, dataFile.FileMeta.Path) - if err != nil { - return nil, errors.Trace(err) - } - numberRows, err := ReadParquetFileRowCount(ctx, store, r, dataFile.FileMeta.Path) - if err != nil { - return nil, errors.Trace(err) + numberRows := dataFile.FileMeta.Rows + var err error + // for safety + if numberRows <= 0 { + numberRows, err = ReadParquetFileRowCountByFile(ctx, store, dataFile.FileMeta) + if err != nil { + return nil, err + } } region := &TableRegion{ DB: meta.DB, @@ -354,6 +361,7 @@ func makeParquetFileRegion( Chunk: Chunk{ Offset: 0, EndOffset: numberRows, + RealOffset: 0, PrevRowIDMax: 0, RowIDMax: numberRows, },