Lightning: fix restore-bytes and encode speed metrics, add restore-rows metrics (#42070)

close pingcap/tidb#41751
This commit is contained in:
Chunzhu Li
2023-03-22 21:38:51 +08:00
committed by GitHub
parent d9149b0afa
commit 3337cc11b1
10 changed files with 180 additions and 64 deletions

View File

@ -104,7 +104,7 @@ func (j *regionJob) convertStageTo(stage jobStageTp) {
j.engine.importedKVSize.Add(j.writeResult.rangeStats.totalBytes)
j.engine.importedKVCount.Add(j.writeResult.rangeStats.count)
if j.metrics != nil {
j.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).
j.metrics.BytesCounter.WithLabelValues(metric.StateImported).
Add(float64(j.writeResult.rangeStats.totalBytes))
}
}

View File

@ -78,6 +78,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",

View File

@ -256,20 +256,24 @@ func (cr *chunkProcessor) encodeLoop(
canDeliver := false
kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt)
curOffset := offset
var newOffset, rowID, realOffset int64
var newOffset, rowID, newScannedOffset int64
var scannedOffset int64 = -1
var kvSize uint64
var realOffsetErr error
var scannedOffsetErr error
outLoop:
for !canDeliver {
readDurStart := time.Now()
err = cr.parser.ReadRow()
columnNames := cr.parser.Columns()
newOffset, rowID = cr.parser.Pos()
if cr.chunk.FileMeta.Compression != mydump.CompressionNone {
realOffset, realOffsetErr = cr.parser.RealPos()
if realOffsetErr != nil {
logger.Warn("fail to get data engine RealPos, progress may not be accurate",
log.ShortError(realOffsetErr), zap.String("file", cr.chunk.FileMeta.Path))
if cr.chunk.FileMeta.Compression != mydump.CompressionNone || cr.chunk.FileMeta.Type == mydump.SourceTypeParquet {
newScannedOffset, scannedOffsetErr = cr.parser.ScannedPos()
if scannedOffsetErr != nil {
logger.Warn("fail to get data engine ScannedPos, progress may not be accurate",
log.ShortError(scannedOffsetErr), zap.String("file", cr.chunk.FileMeta.Path))
}
if scannedOffset == -1 {
scannedOffset = newScannedOffset
}
}
@ -334,7 +338,7 @@ func (cr *chunkProcessor) encodeLoop(
}
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset,
rowID: rowID, realOffset: realOffset})
rowID: rowID, realOffset: newScannedOffset})
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
@ -352,7 +356,11 @@ func (cr *chunkProcessor) encodeLoop(
if m, ok := metric.FromContext(ctx); ok {
m.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds())
m.RowReadSecondsHistogram.Observe(readDur.Seconds())
m.RowReadBytesHistogram.Observe(float64(newOffset - offset))
if cr.chunk.FileMeta.Type == mydump.SourceTypeParquet {
m.RowReadBytesHistogram.Observe(float64(newScannedOffset - scannedOffset))
} else {
m.RowReadBytesHistogram.Observe(float64(newOffset - offset))
}
}
if len(kvPacket) != 0 {
@ -514,11 +522,18 @@ func (cr *chunkProcessor) deliverLoop(
}
delta := highOffset - lowOffset
if delta >= 0 {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta))
if cr.chunk.FileMeta.Type == mydump.SourceTypeParquet {
if currRealOffset > startRealOffset {
m.BytesCounter.WithLabelValues(metric.StateRestored).Add(float64(currRealOffset - startRealOffset))
}
m.RowsCounter.WithLabelValues(metric.StateRestored, t.tableName).Add(float64(delta))
} else {
m.BytesCounter.WithLabelValues(metric.StateRestored).Add(float64(delta))
m.RowsCounter.WithLabelValues(metric.StateRestored, t.tableName).Add(float64(dataChecksum.SumKVS()))
}
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(delta)
}
m.RowsCounter.WithLabelValues(t.tableName).Add(float64(dataChecksum.SumKVS()))
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset),
zap.Int64("start", lowOffset))

View File

@ -64,6 +64,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
"github.com/pingcap/tidb/util/set"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
@ -959,14 +960,10 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error
func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error {
estimatedChunkCount := 0.0
estimatedEngineCnt := int64(0)
batchSize := rc.cfg.Mydumper.BatchSize
if batchSize <= 0 {
// if rows in source files are not sorted by primary key(if primary is number or cluster index enabled),
// the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it.
batchSize = config.DefaultBatchSize
}
for _, dbMeta := range rc.dbMetas {
for _, tableMeta := range dbMeta.Tables {
batchSize := mydump.CalculateBatchSize(float64(rc.cfg.Mydumper.BatchSize),
tableMeta.IsRowOrdered, float64(tableMeta.TotalSize))
tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name)
dbCp, err := rc.checkpointsDB.Get(ctx, tableName)
if err != nil {
@ -1244,8 +1241,10 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
}
// log the current progress periodically, so OPS will know that we're still working
nanoseconds := float64(time.Since(start).Nanoseconds())
totalRestoreBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore))
restoredBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateRestored))
totalRestoreBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateTotalRestore))
restoredBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateRestored))
totalRowsToRestore := metric.ReadAllCounters(metrics.RowsCounter.MetricVec, prometheus.Labels{"state": metric.StateTotalRestore})
restoredRows := metric.ReadAllCounters(metrics.RowsCounter.MetricVec, prometheus.Labels{"state": metric.StateRestored})
// the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate
// before the last table start, so use the bigger of the two should be a workaround
estimated := metric.ReadCounter(metrics.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated))
@ -1263,8 +1262,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
engineEstimated = enginePending
}
engineFinished := metric.ReadCounter(metrics.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess))
bytesWritten := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten))
bytesImported := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateImported))
bytesWritten := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateRestoreWritten))
bytesImported := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.StateImported))
var state string
var remaining zap.Field
@ -1295,10 +1294,20 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
// total progress.
restoreBytesField := zap.Skip()
importBytesField := zap.Skip()
restoreRowsField := zap.Skip()
remaining = zap.Skip()
totalPercent := 0.0
if restoredBytes > 0 {
restorePercent := math.Min(restoredBytes/totalRestoreBytes, 1.0)
if restoredBytes > 0 || restoredRows > 0 {
var restorePercent float64
if totalRowsToRestore > 0 {
restorePercent = math.Min(restoredRows/totalRowsToRestore, 1.0)
restoreRowsField = zap.String("restore-rows", fmt.Sprintf("%.0f/%.0f",
restoredRows, totalRowsToRestore))
} else {
restorePercent = math.Min(restoredBytes/totalRestoreBytes, 1.0)
restoreRowsField = zap.String("restore-rows", fmt.Sprintf("%.0f/%.0f(estimated)",
restoredRows, restoredRows/restorePercent))
}
metrics.ProgressGauge.WithLabelValues(metric.ProgressPhaseRestore).Set(restorePercent)
if rc.cfg.TikvImporter.Backend != config.BackendTiDB {
var importPercent float64
@ -1348,7 +1357,7 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))),
zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))),
zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))),
restoreBytesField, importBytesField,
restoreBytesField, restoreRowsField, importBytesField,
encodeSpeedField,
zap.String("state", state),
remaining,
@ -1668,13 +1677,33 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
allTasks = append(allTasks, task{tr: tr, cp: cp})
if len(cp.Engines) == 0 {
for _, fi := range tableMeta.DataFiles {
for i, fi := range tableMeta.DataFiles {
totalDataSizeToRestore += fi.FileMeta.FileSize
if fi.FileMeta.Type == mydump.SourceTypeParquet {
numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta)
if err != nil {
return errors.Trace(err)
}
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows))
}
fi.FileMeta.Rows = numberRows
tableMeta.DataFiles[i] = fi
}
}
} else {
for _, eng := range cp.Engines {
for _, chunk := range eng.Chunks {
totalDataSizeToRestore += chunk.UnfinishedSize()
// for parquet files filesize is more accurate, we can calculate correct unfinished bytes unless
// we set up the reader, so we directly use filesize here
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
totalDataSizeToRestore += chunk.FileMeta.FileSize
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(chunk.UnfinishedSize()))
}
} else {
totalDataSizeToRestore += chunk.UnfinishedSize()
}
}
}
}
@ -1682,7 +1711,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}
if m, ok := metric.FromContext(ctx); ok {
m.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore).Add(float64(totalDataSizeToRestore))
m.BytesCounter.WithLabelValues(metric.StateTotalRestore).Add(float64(totalDataSizeToRestore))
}
for i := range allTasks {

View File

@ -717,7 +717,7 @@ ChunkLoop:
if err == nil {
if metrics != nil {
metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt)
metrics.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize()))
metrics.BytesCounter.WithLabelValues(metric.StateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize()))
}
if dataFlushStatus != nil && indexFlushStaus != nil {
if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() {
@ -751,14 +751,18 @@ ChunkLoop:
// Report some statistics into the log for debugging.
totalKVSize := uint64(0)
totalSQLSize := int64(0)
logKeyName := "read(bytes)"
for _, chunk := range cp.Chunks {
totalKVSize += chunk.Checksum.SumSize()
totalSQLSize += chunk.UnfinishedSize()
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
logKeyName = "read(rows)"
}
}
err = chunkErr.Get()
logTask.End(zap.ErrorLevel, err,
zap.Int64("read", totalSQLSize),
zap.Int64(logKeyName, totalSQLSize),
zap.Uint64("written", totalKVSize),
)
@ -1238,7 +1242,7 @@ func (tr *TableImporter) addIndexes(ctx context.Context, db *sql.DB) (retErr err
var totalRows int
if m, ok := metric.FromContext(ctx); ok {
totalRows = int(metric.ReadCounter(m.RowsCounter.WithLabelValues(tr.tableName)))
totalRows = int(metric.ReadCounter(m.RowsCounter.WithLabelValues(metric.StateRestored, tableName)))
}
// Try to add all indexes in one statement.

View File

@ -29,10 +29,10 @@ const (
TableStateImported = "imported"
TableStateCompleted = "completed"
BytesStateTotalRestore = "total_restore" // total source data bytes needs to restore
BytesStateRestored = "restored" // source data bytes restored during restore engine
BytesStateRestoreWritten = "written" // bytes written during restore engine
BytesStateImported = "imported" // bytes imported during import engine
StateTotalRestore = "total_restore" // total source data bytes needs to restore
StateRestored = "restored" // source data bytes restored during restore engine
StateRestoreWritten = "written" // bytes written during restore engine
StateImported = "imported" // bytes imported during import engine
ProgressPhaseTotal = "total" // total restore progress(not include post-process, like checksum and analyze)
ProgressPhaseRestore = "restore" // restore engine progress
@ -134,12 +134,13 @@ func NewMetrics(factory promutil.Factory) *Metrics {
// - running
// - finished
// - failed
RowsCounter: factory.NewCounterVec(
prometheus.CounterOpts{
Namespace: "lightning",
Name: "rows",
Help: "count of total rows",
}, []string{"table"}),
}, []string{"state", "table"}),
ImportSecondsHistogram: factory.NewHistogram(
prometheus.HistogramOpts{
@ -322,6 +323,37 @@ func ReadCounter(counter prometheus.Counter) float64 {
return metric.Counter.GetValue()
}
func metricHasLabel(labelPairs []*dto.LabelPair, labels prometheus.Labels) bool {
for _, label := range labelPairs {
if v, ok := labels[label.GetName()]; ok && v == label.GetValue() {
return true
}
}
return false
}
// ReadAllCounters reports the summary value of the counters with given labels.
func ReadAllCounters(metricsVec *prometheus.MetricVec, labels prometheus.Labels) float64 {
metricsChan := make(chan prometheus.Metric, 8)
go func() {
metricsVec.Collect(metricsChan)
close(metricsChan)
}()
var sum float64
for counter := range metricsChan {
var metric dto.Metric
if err := counter.Write(&metric); err != nil {
return math.NaN()
}
if !metricHasLabel(metric.GetLabel(), labels) {
continue
}
sum += metric.Counter.GetValue()
}
return sum
}
// ReadHistogramSum reports the sum of all observed values in the histogram.
func ReadHistogramSum(histogram prometheus.Histogram) float64 {
var metric dto.Metric

View File

@ -89,6 +89,7 @@ type SourceFileMeta struct {
// WARNING: variables below are not persistent
ExtendData ExtendColumnData
RealSize int64
Rows int64 // only for parquet
}
// NewMDTableMeta creates an Mydumper table meta with specified character set.

View File

@ -47,6 +47,8 @@ type ParquetParser struct {
curIndex int
lastRow Row
logger log.Logger
readSeekCloser ReadSeekCloser
}
// readerWrapper is a used for implement `source.ParquetFile`
@ -144,9 +146,9 @@ func OpenParquetReader(
}, nil
}
// ReadParquetFileRowCount reads the parquet file row count.
// readParquetFileRowCount reads the parquet file row count.
// It is a special func to fetch parquet file row count fast.
func ReadParquetFileRowCount(
func readParquetFileRowCount(
ctx context.Context,
store storage.ExternalStorage,
r storage.ReadSeekCloser,
@ -172,6 +174,23 @@ func ReadParquetFileRowCount(
return numRows, nil
}
// ReadParquetFileRowCountByFile reads the parquet file row count through fileMeta.
func ReadParquetFileRowCountByFile(
ctx context.Context,
store storage.ExternalStorage,
fileMeta SourceFileMeta,
) (int64, error) {
r, err := store.Open(ctx, fileMeta.Path)
if err != nil {
return 0, errors.Trace(err)
}
numberRows, err := readParquetFileRowCount(ctx, store, r, fileMeta.Path)
if err != nil {
return 0, errors.Trace(err)
}
return numberRows, nil
}
// NewParquetParser generates a parquet parser.
func NewParquetParser(
ctx context.Context,
@ -216,10 +235,11 @@ func NewParquetParser(
}
return &ParquetParser{
Reader: reader,
columns: columns,
columnMetas: columnMetas,
logger: log.FromContext(ctx),
Reader: reader,
columns: columns,
columnMetas: columnMetas,
logger: log.FromContext(ctx),
readSeekCloser: wrapper,
}, nil
}
@ -351,10 +371,10 @@ func (pp *ParquetParser) SetPos(pos int64, rowID int64) error {
return nil
}
// RealPos implements the Parser interface.
// For parquet it's equal to Pos().
func (pp *ParquetParser) RealPos() (int64, error) {
return pp.curStart + int64(pp.curIndex), nil
// ScannedPos implements the Parser interface.
// For parquet it's parquet file's reader current position.
func (pp *ParquetParser) ScannedPos() (int64, error) {
return pp.readSeekCloser.Seek(0, io.SeekCurrent)
}
// Close closes the parquet file of the parser.

View File

@ -132,9 +132,15 @@ const (
// Parser provides some methods to parse a source data file.
type Parser interface {
// Pos returns means the position that parser have already handled. It's mainly used for checkpoint.
// For normal files it's the file offset we handled.
// For parquet files it's the row count we handled.
// For compressed files it's the uncompressed file offset we handled.
// TODO: replace pos with a new structure to specify position offset and rows offset
Pos() (pos int64, rowID int64)
SetPos(pos int64, rowID int64) error
RealPos() (int64, error)
// ScannedPos always returns the current file reader pointer's location
ScannedPos() (int64, error)
Close() error
ReadRow() error
LastRow() Row
@ -184,8 +190,8 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error {
return nil
}
// RealPos gets the read position of current reader.
func (parser *blockParser) RealPos() (int64, error) {
// ScannedPos gets the read position of current reader.
func (parser *blockParser) ScannedPos() (int64, error) {
return parser.reader.Seek(0, io.SeekCurrent)
}

View File

@ -240,14 +240,7 @@ func MakeTableRegions(
rowIDBase = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax
}
batchSize := float64(cfg.Mydumper.BatchSize)
if cfg.Mydumper.BatchSize <= 0 {
if meta.IsRowOrdered {
batchSize = float64(config.DefaultBatchSize)
} else {
batchSize = math.Max(float64(config.DefaultBatchSize), float64(meta.TotalSize))
}
}
batchSize := CalculateBatchSize(float64(cfg.Mydumper.BatchSize), meta.IsRowOrdered, float64(meta.TotalSize))
log.FromContext(ctx).Info("makeTableRegions", zap.Int("filesCount", len(meta.DataFiles)),
zap.Int64("MaxRegionSize", int64(cfg.Mydumper.MaxRegionSize)),
@ -258,6 +251,19 @@ func MakeTableRegions(
return filesRegions, nil
}
// CalculateBatchSize calculates batch size according to row order and file size.
func CalculateBatchSize(mydumperBatchSize float64, isRowOrdered bool, totalSize float64) float64 {
batchSize := mydumperBatchSize
if batchSize <= 0 {
if isRowOrdered {
batchSize = float64(config.DefaultBatchSize)
} else {
batchSize = math.Max(float64(config.DefaultBatchSize), totalSize)
}
}
return batchSize
}
// MakeSourceFileRegion create a new source file region.
func MakeSourceFileRegion(
ctx context.Context,
@ -339,13 +345,14 @@ func makeParquetFileRegion(
meta *MDTableMeta,
dataFile FileInfo,
) (*TableRegion, error) {
r, err := store.Open(ctx, dataFile.FileMeta.Path)
if err != nil {
return nil, errors.Trace(err)
}
numberRows, err := ReadParquetFileRowCount(ctx, store, r, dataFile.FileMeta.Path)
if err != nil {
return nil, errors.Trace(err)
numberRows := dataFile.FileMeta.Rows
var err error
// for safety
if numberRows <= 0 {
numberRows, err = ReadParquetFileRowCountByFile(ctx, store, dataFile.FileMeta)
if err != nil {
return nil, err
}
}
region := &TableRegion{
DB: meta.DB,
@ -354,6 +361,7 @@ func makeParquetFileRegion(
Chunk: Chunk{
Offset: 0,
EndOffset: numberRows,
RealOffset: 0,
PrevRowIDMax: 0,
RowIDMax: numberRows,
},