diff --git a/dumpling/export/config.go b/dumpling/export/config.go index 35fdb19fda..f23927f8c3 100644 --- a/dumpling/export/config.go +++ b/dumpling/export/config.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/promutil" filter "github.com/pingcap/tidb/util/table-filter" ) @@ -80,7 +81,6 @@ const ( // Config is the dump config for dumpling type Config struct { storage.BackendOptions - ExtStorage storage.ExternalStorage `json:"-"` specifiedTables bool AllowCleartextPasswords bool @@ -124,21 +124,26 @@ type Config struct { CsvDelimiter string Databases []string - TableFilter filter.Filter `json:"-"` - Where string - FileType string - ServerInfo version.ServerInfo - Logger *zap.Logger `json:"-"` - OutputFileTemplate *template.Template `json:"-"` - Rows uint64 - ReadTimeout time.Duration - TiDBMemQuotaQuery uint64 - FileSize uint64 - StatementSize uint64 - SessionParams map[string]interface{} + TableFilter filter.Filter `json:"-"` + Where string + FileType string + ServerInfo version.ServerInfo + Logger *zap.Logger `json:"-"` + OutputFileTemplate *template.Template `json:"-"` + Rows uint64 + ReadTimeout time.Duration + TiDBMemQuotaQuery uint64 + FileSize uint64 + StatementSize uint64 + SessionParams map[string]interface{} + // TODO: deprecate it Labels prometheus.Labels `json:"-"` Tables DatabaseTables CollationCompatible string + + // fields below are injected from DM or dataflow engine + ExtStorage storage.ExternalStorage `json:"-"` + PromFactory promutil.Factory } // ServerInfoUnknown is the unknown database type to dumpling diff --git a/dumpling/export/dump.go b/dumpling/export/dump.go index 1d290e89ee..bb461ed580 100755 --- a/dumpling/export/dump.go +++ b/dumpling/export/dump.go @@ -45,8 +45,9 @@ var emptyHandleValsErr = errors.New("empty handleVals for TiDB table") // Dumper is the dump progress structure type Dumper struct { tctx *tcontext.Context - conf *Config cancelCtx context.CancelFunc + conf *Config + metrics *metrics extStore storage.ExternalStorage dbHandle *sql.DB @@ -79,6 +80,13 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) { cancelCtx: cancelFn, selectTiDBTableRegionFunc: selectTiDBTableRegion, } + + if conf.PromFactory == nil { + d.metrics = defaultMetrics + } else { + d.metrics = newMetrics(conf.PromFactory, []string{}) + } + err := adjustConfig(conf, registerTLSConfig, validateSpecifiedSQL, @@ -211,7 +219,7 @@ func (d *Dumper) Dump() (dumpErr error) { } taskChan := make(chan Task, defaultDumpThreads) - AddGauge(taskChannelCapacity, conf.Labels, defaultDumpThreads) + AddGauge(d.metrics.taskChannelCapacity, conf.Labels, defaultDumpThreads) wg, writingCtx := errgroup.WithContext(tctx) writerCtx := tctx.WithContext(writingCtx) writers, tearDownWriters, err := d.startWriters(writerCtx, wg, taskChan, rebuildConn) @@ -290,11 +298,11 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh if err != nil { return nil, func() {}, err } - writer := NewWriter(tctx, int64(i), conf, conn, d.extStore) + writer := NewWriter(tctx, int64(i), conf, conn, d.extStore, d.metrics) writer.rebuildConnFn = rebuildConnFn writer.setFinishTableCallBack(func(task Task) { if _, ok := task.(*TaskTableData); ok { - IncCounter(finishedTablesCounter, conf.Labels) + IncCounter(d.metrics.finishedTablesCounter, conf.Labels) // FIXME: actually finishing the last chunk doesn't means this table is 'finished'. // We can call this table is 'finished' if all its chunks are finished. // Comment this log now to avoid ambiguity. @@ -304,7 +312,7 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh } }) writer.setFinishTaskCallBack(func(task Task) { - IncGauge(taskChannelCapacity, conf.Labels) + IncGauge(d.metrics.taskChannelCapacity, conf.Labels) if td, ok := task.(*TaskTableData); ok { tctx.L().Debug("finish dumping table data task", zap.String("database", td.Meta.DatabaseName()), @@ -560,7 +568,7 @@ func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *BaseConn, meta Tabl // Update total rows fieldName, _ := pickupPossibleField(tctx, meta, conn) c := estimateCount(tctx, meta.DatabaseName(), meta.TableName(), conn, fieldName, conf) - AddCounter(estimateTotalRowsCounter, conf.Labels, float64(c)) + AddCounter(d.metrics.estimateTotalRowsCounter, conf.Labels, float64(c)) if conf.Rows == UnspecifiedSize { return d.sequentialDumpTable(tctx, conn, meta, taskChan) @@ -765,7 +773,7 @@ func (d *Dumper) sendTaskToChan(tctx *tcontext.Context, task Task, taskChan chan case taskChan <- task: tctx.L().Debug("send task to writer", zap.String("task", task.Brief())) - DecGauge(taskChannelCapacity, conf.Labels) + DecGauge(d.metrics.taskChannelCapacity, conf.Labels) return false } } @@ -1201,7 +1209,7 @@ func (d *Dumper) dumpSQL(tctx *tcontext.Context, metaConn *BaseConn, taskChan ch data := newTableData(conf.SQL, 0, true) task := NewTaskTableData(meta, data, 0, 1) c := detectEstimateRows(tctx, metaConn, fmt.Sprintf("EXPLAIN %s", conf.SQL), []string{"rows", "estRows", "count"}) - AddCounter(estimateTotalRowsCounter, conf.Labels, float64(c)) + AddCounter(d.metrics.estimateTotalRowsCounter, conf.Labels, float64(c)) atomic.StoreInt64(&d.totalTables, int64(1)) d.sendTaskToChan(tctx, task, taskChan) } diff --git a/dumpling/export/ir_impl_test.go b/dumpling/export/ir_impl_test.go index 2f8681b3c1..60a47e57c7 100644 --- a/dumpling/export/ir_impl_test.go +++ b/dumpling/export/ir_impl_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/tidb/util/promutil" "github.com/stretchr/testify/require" ) @@ -98,7 +99,8 @@ func TestChunkRowIter(t *testing.T) { sqlRowIter := newRowIter(rows, 2) res := newSimpleRowReceiver(2) - wp := newWriterPipe(nil, testFileSize, testStatementSize, nil) + metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{}) + wp := newWriterPipe(nil, testFileSize, testStatementSize, metrics, nil) var resSize [][]uint64 for sqlRowIter.HasNext() { diff --git a/dumpling/export/metrics.go b/dumpling/export/metrics.go index 2a812ea2a0..4bf3ed5337 100644 --- a/dumpling/export/metrics.go +++ b/dumpling/export/metrics.go @@ -5,11 +5,12 @@ package export import ( "math" + "github.com/pingcap/tidb/util/promutil" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" ) -var ( +type metrics struct { finishedSizeGauge *prometheus.GaugeVec finishedRowsGauge *prometheus.GaugeVec finishedTablesCounter *prometheus.CounterVec @@ -18,7 +19,72 @@ var ( receiveWriteChunkTimeHistogram *prometheus.HistogramVec errorCount *prometheus.CounterVec taskChannelCapacity *prometheus.GaugeVec -) +} + +var defaultMetrics *metrics + +func newMetrics(f promutil.Factory, labelNames []string) *metrics { + m := metrics{} + m.finishedSizeGauge = f.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dumpling", + Subsystem: "dump", + Name: "finished_size", + Help: "counter for dumpling finished file size", + }, labelNames) + m.estimateTotalRowsCounter = f.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dumpling", + Subsystem: "dump", + Name: "estimate_total_rows", + Help: "estimate total rows for dumpling tables", + }, labelNames) + m.finishedRowsGauge = f.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dumpling", + Subsystem: "dump", + Name: "finished_rows", + Help: "counter for dumpling finished rows", + }, labelNames) + m.finishedTablesCounter = f.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dumpling", + Subsystem: "dump", + Name: "finished_tables", + Help: "counter for dumpling finished tables", + }, labelNames) + m.writeTimeHistogram = f.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dumpling", + Subsystem: "write", + Name: "write_duration_time", + Help: "Bucketed histogram of write time (s) of files", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), + }, labelNames) + m.receiveWriteChunkTimeHistogram = f.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dumpling", + Subsystem: "write", + Name: "receive_chunk_duration_time", + Help: "Bucketed histogram of receiving time (s) of chunks", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), + }, labelNames) + m.errorCount = f.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dumpling", + Subsystem: "dump", + Name: "error_count", + Help: "Total error count during dumping progress", + }, labelNames) + m.taskChannelCapacity = f.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dumpling", + Subsystem: "dump", + Name: "channel_capacity", + Help: "The task channel capacity during dumping progress", + }, labelNames) + return &m +} // InitMetricsVector inits metrics vectors. // This function must run before RegisterMetrics @@ -27,94 +93,37 @@ func InitMetricsVector(labels prometheus.Labels) { for name := range labels { labelNames = append(labelNames, name) } - finishedSizeGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "dumpling", - Subsystem: "dump", - Name: "finished_size", - Help: "counter for dumpling finished file size", - }, labelNames) - estimateTotalRowsCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "dumpling", - Subsystem: "dump", - Name: "estimate_total_rows", - Help: "estimate total rows for dumpling tables", - }, labelNames) - finishedRowsGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "dumpling", - Subsystem: "dump", - Name: "finished_rows", - Help: "counter for dumpling finished rows", - }, labelNames) - finishedTablesCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "dumpling", - Subsystem: "dump", - Name: "finished_tables", - Help: "counter for dumpling finished tables", - }, labelNames) - writeTimeHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "dumpling", - Subsystem: "write", - Name: "write_duration_time", - Help: "Bucketed histogram of write time (s) of files", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), - }, labelNames) - receiveWriteChunkTimeHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "dumpling", - Subsystem: "write", - Name: "receive_chunk_duration_time", - Help: "Bucketed histogram of receiving time (s) of chunks", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), - }, labelNames) - errorCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "dumpling", - Subsystem: "dump", - Name: "error_count", - Help: "Total error count during dumping progress", - }, labelNames) - taskChannelCapacity = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "dumpling", - Subsystem: "dump", - Name: "channel_capacity", - Help: "The task channel capacity during dumping progress", - }, labelNames) + defaultMetrics = newMetrics(&promutil.PlainNoAutoRegisterFactory{}, labelNames) } // RegisterMetrics registers metrics. func RegisterMetrics(registry *prometheus.Registry) { - if finishedSizeGauge == nil { + if defaultMetrics == nil || defaultMetrics.finishedSizeGauge == nil { return } - registry.MustRegister(finishedSizeGauge) - registry.MustRegister(finishedRowsGauge) - registry.MustRegister(estimateTotalRowsCounter) - registry.MustRegister(finishedTablesCounter) - registry.MustRegister(writeTimeHistogram) - registry.MustRegister(receiveWriteChunkTimeHistogram) - registry.MustRegister(errorCount) - registry.MustRegister(taskChannelCapacity) + registry.MustRegister(defaultMetrics.finishedSizeGauge) + registry.MustRegister(defaultMetrics.finishedRowsGauge) + registry.MustRegister(defaultMetrics.estimateTotalRowsCounter) + registry.MustRegister(defaultMetrics.finishedTablesCounter) + registry.MustRegister(defaultMetrics.writeTimeHistogram) + registry.MustRegister(defaultMetrics.receiveWriteChunkTimeHistogram) + registry.MustRegister(defaultMetrics.errorCount) + registry.MustRegister(defaultMetrics.taskChannelCapacity) } // RemoveLabelValuesWithTaskInMetrics removes metrics of specified labels. func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) { - if finishedSizeGauge == nil { + if defaultMetrics.finishedSizeGauge == nil { return } - finishedSizeGauge.Delete(labels) - finishedRowsGauge.Delete(labels) - estimateTotalRowsCounter.Delete(labels) - finishedTablesCounter.Delete(labels) - writeTimeHistogram.Delete(labels) - receiveWriteChunkTimeHistogram.Delete(labels) - errorCount.Delete(labels) - taskChannelCapacity.Delete(labels) + defaultMetrics.finishedSizeGauge.Delete(labels) + defaultMetrics.finishedRowsGauge.Delete(labels) + defaultMetrics.estimateTotalRowsCounter.Delete(labels) + defaultMetrics.finishedTablesCounter.Delete(labels) + defaultMetrics.writeTimeHistogram.Delete(labels) + defaultMetrics.receiveWriteChunkTimeHistogram.Delete(labels) + defaultMetrics.errorCount.Delete(labels) + defaultMetrics.taskChannelCapacity.Delete(labels) } // ReadCounter reports the current value of the counter. diff --git a/dumpling/export/sql_test.go b/dumpling/export/sql_test.go index 74df4557c6..69a7b151aa 100644 --- a/dumpling/export/sql_test.go +++ b/dumpling/export/sql_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb/util/promutil" "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/errors" @@ -535,11 +536,13 @@ func TestBuildTableSampleQueries(t *testing.T) { require.NoError(t, err) baseConn := newBaseConn(conn, true, nil) tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{}) d := &Dumper{ tctx: tctx, conf: DefaultConfig(), cancelCtx: cancel, + metrics: metrics, selectTiDBTableRegionFunc: selectTiDBTableRegion, } d.conf.ServerInfo = version.ServerInfo{ @@ -945,11 +948,13 @@ func TestBuildRegionQueriesWithoutPartition(t *testing.T) { require.NoError(t, err) baseConn := newBaseConn(conn, true, nil) tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{}) d := &Dumper{ tctx: tctx, conf: DefaultConfig(), cancelCtx: cancel, + metrics: metrics, selectTiDBTableRegionFunc: selectTiDBTableRegion, } d.conf.ServerInfo = version.ServerInfo{ @@ -1104,11 +1109,13 @@ func TestBuildRegionQueriesWithPartitions(t *testing.T) { require.NoError(t, err) baseConn := newBaseConn(conn, true, nil) tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{}) d := &Dumper{ tctx: tctx, conf: DefaultConfig(), cancelCtx: cancel, + metrics: metrics, selectTiDBTableRegionFunc: selectTiDBTableRegion, } d.conf.ServerInfo = version.ServerInfo{ @@ -1360,10 +1367,13 @@ func TestBuildVersion3RegionQueries(t *testing.T) { {"t4", 0, TableTypeBase}, }, } + metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{}) + d := &Dumper{ tctx: tctx, conf: conf, cancelCtx: cancel, + metrics: metrics, selectTiDBTableRegionFunc: selectTiDBTableRegion, } showStatsHistograms := buildMockNewRows(mock, []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size", "Correlation"}, diff --git a/dumpling/export/status.go b/dumpling/export/status.go index 359eae8ed7..458b2bb872 100644 --- a/dumpling/export/status.go +++ b/dumpling/export/status.go @@ -54,10 +54,10 @@ func (d *Dumper) GetParameters() (midparams *Midparams) { conf := d.conf mid := &Midparams{} mid.TotalTables = atomic.LoadInt64(&d.totalTables) - mid.CompletedTables = ReadCounter(finishedTablesCounter, conf.Labels) - mid.FinishedBytes = ReadGauge(finishedSizeGauge, conf.Labels) - mid.FinishedRows = ReadGauge(finishedRowsGauge, conf.Labels) - mid.EstimateTotalRows = ReadCounter(estimateTotalRowsCounter, conf.Labels) + mid.CompletedTables = ReadCounter(d.metrics.finishedTablesCounter, conf.Labels) + mid.FinishedBytes = ReadGauge(d.metrics.finishedSizeGauge, conf.Labels) + mid.FinishedRows = ReadGauge(d.metrics.finishedRowsGauge, conf.Labels) + mid.EstimateTotalRows = ReadCounter(d.metrics.estimateTotalRowsCounter, conf.Labels) return mid } diff --git a/dumpling/export/status_test.go b/dumpling/export/status_test.go index a002ac3365..ca1434b12f 100644 --- a/dumpling/export/status_test.go +++ b/dumpling/export/status_test.go @@ -12,6 +12,7 @@ func TestGetParameters(t *testing.T) { conf := defaultConfigForTest(t) d := &Dumper{conf: conf} InitMetricsVector(conf.Labels) + d.metrics = defaultMetrics mid := d.GetParameters() require.EqualValues(t, float64(0), mid.CompletedTables) @@ -19,10 +20,10 @@ func TestGetParameters(t *testing.T) { require.EqualValues(t, float64(0), mid.FinishedRows) require.EqualValues(t, float64(0), mid.EstimateTotalRows) - AddCounter(finishedTablesCounter, conf.Labels, 10) - AddGauge(finishedSizeGauge, conf.Labels, 20) - AddGauge(finishedRowsGauge, conf.Labels, 30) - AddCounter(estimateTotalRowsCounter, conf.Labels, 40) + AddCounter(defaultMetrics.finishedTablesCounter, conf.Labels, 10) + AddGauge(defaultMetrics.finishedSizeGauge, conf.Labels, 20) + AddGauge(defaultMetrics.finishedRowsGauge, conf.Labels, 30) + AddCounter(defaultMetrics.estimateTotalRowsCounter, conf.Labels, 40) mid = d.GetParameters() require.EqualValues(t, float64(10), mid.CompletedTables) diff --git a/dumpling/export/writer.go b/dumpling/export/writer.go index 2f0668adc5..9407a6b8cd 100644 --- a/dumpling/export/writer.go +++ b/dumpling/export/writer.go @@ -26,6 +26,7 @@ type Writer struct { conn *sql.Conn extStorage storage.ExternalStorage fileFmt FileFormat + metrics *metrics receivedTaskCount int @@ -35,13 +36,21 @@ type Writer struct { } // NewWriter returns a new Writer with given configurations -func NewWriter(tctx *tcontext.Context, id int64, config *Config, conn *sql.Conn, externalStore storage.ExternalStorage) *Writer { +func NewWriter( + tctx *tcontext.Context, + id int64, + config *Config, + conn *sql.Conn, + externalStore storage.ExternalStorage, + metrics *metrics, +) *Writer { sw := &Writer{ id: id, tctx: tctx, conf: config, conn: conn, extStorage: externalStore, + metrics: metrics, finishTaskCallBack: func(Task) {}, finishTableCallBack: func(Task) {}, } @@ -185,7 +194,7 @@ func (w *Writer) WriteTableData(meta TableMeta, ir TableDataIR, currentChunk int defer func() { lastErr = err if err != nil { - IncCounter(errorCount, conf.Labels) + IncCounter(w.metrics.errorCount, conf.Labels) } }() retryTime++ @@ -225,7 +234,7 @@ func (w *Writer) tryToWriteTableData(tctx *tcontext.Context, meta TableMeta, ir somethingIsWritten := false for { fileWriter, tearDown := buildInterceptFileWriter(tctx, w.extStorage, fileName, conf.CompressType) - n, err := format.WriteInsert(tctx, conf, meta, ir, fileWriter) + n, err := format.WriteInsert(tctx, conf, meta, ir, fileWriter, w.metrics) tearDown(tctx) if err != nil { return err diff --git a/dumpling/export/writer_serial_test.go b/dumpling/export/writer_serial_test.go index bc392adf6c..02251cb925 100644 --- a/dumpling/export/writer_serial_test.go +++ b/dumpling/export/writer_serial_test.go @@ -52,7 +52,7 @@ func TestWriteInsert(t *testing.T) { bf := storage.NewBufferWriter() conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize) - n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.NoError(t, err) require.Equal(t, uint64(4), n) @@ -64,8 +64,8 @@ func TestWriteInsert(t *testing.T) { "(3,'male','john@mail.com','020-1256','healthy'),\n" + "(4,'female','sarah@mail.com','020-1235','healthy');\n" require.Equal(t, expected, bf.String()) - require.Equal(t, ReadGauge(finishedRowsGauge, conf.Labels), float64(len(data))) - require.Equal(t, ReadGauge(finishedSizeGauge, conf.Labels), float64(len(expected))) + require.Equal(t, ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels), float64(len(data))) + require.Equal(t, ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels), float64(len(expected))) } func TestWriteInsertReturnsError(t *testing.T) { @@ -90,7 +90,7 @@ func TestWriteInsertReturnsError(t *testing.T) { bf := storage.NewBufferWriter() conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize) - n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.ErrorIs(t, err, rowErr) require.Equal(t, uint64(3), n) @@ -102,8 +102,8 @@ func TestWriteInsertReturnsError(t *testing.T) { "(3,'male','john@mail.com','020-1256','healthy');\n" require.Equal(t, expected, bf.String()) // error occurred, should revert pointer to zero - require.Equal(t, ReadGauge(finishedRowsGauge, conf.Labels), float64(0)) - require.Equal(t, ReadGauge(finishedSizeGauge, conf.Labels), float64(0)) + require.Equal(t, ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels), float64(0)) + require.Equal(t, ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels), float64(0)) } func TestWriteInsertInCsv(t *testing.T) { @@ -123,7 +123,7 @@ func TestWriteInsertInCsv(t *testing.T) { // test nullValue opt := &csvOption{separator: []byte(","), delimiter: []byte{'"'}, nullValue: "\\N"} conf := configForWriteCSV(cfg, true, opt) - n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.Equal(t, uint64(4), n) require.NoError(t, err) @@ -132,8 +132,8 @@ func TestWriteInsertInCsv(t *testing.T) { "3,\"male\",\"john@mail.com\",\"020-1256\",\"healthy\"\r\n" + "4,\"female\",\"sarah@mail.com\",\"020-1235\",\"healthy\"\r\n" require.Equal(t, expected, bf.String()) - require.Equal(t, float64(len(data)), ReadGauge(finishedRowsGauge, conf.Labels)) - require.Equal(t, float64(len(expected)), ReadGauge(finishedSizeGauge, conf.Labels)) + require.Equal(t, float64(len(data)), ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels)) + require.Equal(t, float64(len(expected)), ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels)) RemoveLabelValuesWithTaskInMetrics(conf.Labels) @@ -142,7 +142,7 @@ func TestWriteInsertInCsv(t *testing.T) { opt.delimiter = quotationMark tableIR = newMockTableIR("test", "employee", data, nil, colTypes) conf = configForWriteCSV(cfg, true, opt) - n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.Equal(t, uint64(4), n) require.NoError(t, err) @@ -151,8 +151,8 @@ func TestWriteInsertInCsv(t *testing.T) { "3,'male','john@mail.com','020-1256','healthy'\r\n" + "4,'female','sarah@mail.com','020-1235','healthy'\r\n" require.Equal(t, expected, bf.String()) - require.Equal(t, float64(len(data)), ReadGauge(finishedRowsGauge, conf.Labels)) - require.Equal(t, float64(len(expected)), ReadGauge(finishedSizeGauge, conf.Labels)) + require.Equal(t, float64(len(data)), ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels)) + require.Equal(t, float64(len(expected)), ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels)) RemoveLabelValuesWithTaskInMetrics(conf.Labels) @@ -161,7 +161,7 @@ func TestWriteInsertInCsv(t *testing.T) { opt.separator = []byte(";") tableIR = newMockTableIR("test", "employee", data, nil, colTypes) conf = configForWriteCSV(cfg, true, opt) - n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.Equal(t, uint64(4), n) require.NoError(t, err) @@ -170,8 +170,8 @@ func TestWriteInsertInCsv(t *testing.T) { "3;'male';'john@mail.com';'020-1256';'healthy'\r\n" + "4;'female';'sarah@mail.com';'020-1235';'healthy'\r\n" require.Equal(t, expected, bf.String()) - require.Equal(t, float64(len(data)), ReadGauge(finishedRowsGauge, conf.Labels)) - require.Equal(t, float64(len(expected)), ReadGauge(finishedSizeGauge, conf.Labels)) + require.Equal(t, float64(len(data)), ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels)) + require.Equal(t, float64(len(expected)), ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels)) RemoveLabelValuesWithTaskInMetrics(conf.Labels) @@ -182,7 +182,7 @@ func TestWriteInsertInCsv(t *testing.T) { tableIR = newMockTableIR("test", "employee", data, nil, colTypes) tableIR.colNames = []string{"id", "gender", "email", "phone_number", "status"} conf = configForWriteCSV(cfg, false, opt) - n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.Equal(t, uint64(4), n) require.NoError(t, err) @@ -192,8 +192,8 @@ func TestWriteInsertInCsv(t *testing.T) { "3&;,?mamamalema&;,?majohn@mamail.comma&;,?ma020-1256ma&;,?mahealthyma\r\n" + "4&;,?mafemamalema&;,?masarah@mamail.comma&;,?ma020-1235ma&;,?mahealthyma\r\n" require.Equal(t, expected, bf.String()) - require.Equal(t, float64(len(data)), ReadGauge(finishedRowsGauge, conf.Labels)) - require.Equal(t, float64(len(expected)), ReadGauge(finishedSizeGauge, conf.Labels)) + require.Equal(t, float64(len(data)), ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels)) + require.Equal(t, float64(len(expected)), ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels)) RemoveLabelValuesWithTaskInMetrics(conf.Labels) } @@ -219,7 +219,7 @@ func TestWriteInsertInCsvReturnsError(t *testing.T) { // test nullValue opt := &csvOption{separator: []byte(","), delimiter: []byte{'"'}, nullValue: "\\N"} conf := configForWriteCSV(cfg, true, opt) - n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.Equal(t, uint64(3), n) require.ErrorIs(t, err, rowErr) @@ -227,8 +227,8 @@ func TestWriteInsertInCsvReturnsError(t *testing.T) { "2,\"female\",\"sarah@mail.com\",\"020-1253\",\"healthy\"\r\n" + "3,\"male\",\"john@mail.com\",\"020-1256\",\"healthy\"\r\n" require.Equal(t, expected, bf.String()) - require.Equal(t, float64(0), ReadGauge(finishedRowsGauge, conf.Labels)) - require.Equal(t, float64(0), ReadGauge(finishedSizeGauge, conf.Labels)) + require.Equal(t, float64(0), ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels)) + require.Equal(t, float64(0), ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels)) RemoveLabelValuesWithTaskInMetrics(conf.Labels) } @@ -252,15 +252,15 @@ func TestSQLDataTypes(t *testing.T) { bf := storage.NewBufferWriter() conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize) - n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf) + n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf, defaultMetrics) require.NoError(t, err) require.Equal(t, uint64(1), n) lines := strings.Split(bf.String(), "\n") require.Len(t, lines, 3) require.Equal(t, fmt.Sprintf("(%s);", result), lines[1]) - require.Equal(t, float64(1), ReadGauge(finishedRowsGauge, conf.Labels)) - require.Equal(t, float64(len(bf.String())), ReadGauge(finishedSizeGauge, conf.Labels)) + require.Equal(t, float64(1), ReadGauge(defaultMetrics.finishedRowsGauge, conf.Labels)) + require.Equal(t, float64(len(bf.String())), ReadGauge(defaultMetrics.finishedSizeGauge, conf.Labels)) RemoveLabelValuesWithTaskInMetrics(conf.Labels) } @@ -316,8 +316,8 @@ func createMockConfig(t *testing.T) (cfg *Config, clean func()) { clean = func() { RemoveLabelValuesWithTaskInMetrics(cfg.Labels) - require.Equal(t, float64(0), ReadGauge(finishedRowsGauge, cfg.Labels)) - require.Equal(t, float64(0), ReadGauge(finishedSizeGauge, cfg.Labels)) + require.Equal(t, float64(0), ReadGauge(defaultMetrics.finishedRowsGauge, cfg.Labels)) + require.Equal(t, float64(0), ReadGauge(defaultMetrics.finishedSizeGauge, cfg.Labels)) } return diff --git a/dumpling/export/writer_test.go b/dumpling/export/writer_test.go index e516bfef02..e268c842b6 100644 --- a/dumpling/export/writer_test.go +++ b/dumpling/export/writer_test.go @@ -12,6 +12,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/tidb/util/promutil" "github.com/stretchr/testify/require" tcontext "github.com/pingcap/tidb/dumpling/context" @@ -350,7 +351,8 @@ func createTestWriter(conf *Config, t *testing.T) (w *Writer, clean func()) { conn, err := db.Conn(context.Background()) require.NoError(t, err) - w = NewWriter(tcontext.Background(), 0, conf, conn, extStore) + metrics := newMetrics(&promutil.PlainNoAutoRegisterFactory{}, []string{}) + w = NewWriter(tcontext.Background(), 0, conf, conn, extStore, metrics) clean = func() { require.NoError(t, db.Close()) } diff --git a/dumpling/export/writer_util.go b/dumpling/export/writer_util.go index 129f54e0d3..6a6574047a 100755 --- a/dumpling/export/writer_util.go +++ b/dumpling/export/writer_util.go @@ -29,10 +29,11 @@ var pool = sync.Pool{New: func() interface{} { }} type writerPipe struct { - input chan *bytes.Buffer - closed chan struct{} - errCh chan error - labels prometheus.Labels + input chan *bytes.Buffer + closed chan struct{} + errCh chan error + metrics *metrics + labels prometheus.Labels finishedFileSize uint64 currentFileSize uint64 @@ -44,13 +45,20 @@ type writerPipe struct { w storage.ExternalFileWriter } -func newWriterPipe(w storage.ExternalFileWriter, fileSizeLimit, statementSizeLimit uint64, labels prometheus.Labels) *writerPipe { +func newWriterPipe( + w storage.ExternalFileWriter, + fileSizeLimit, + statementSizeLimit uint64, + metrics *metrics, + labels prometheus.Labels, +) *writerPipe { return &writerPipe{ - input: make(chan *bytes.Buffer, 8), - closed: make(chan struct{}), - errCh: make(chan error, 1), - w: w, - labels: labels, + input: make(chan *bytes.Buffer, 8), + closed: make(chan struct{}), + errCh: make(chan error, 1), + w: w, + metrics: metrics, + labels: labels, currentFileSize: 0, currentStatementSize: 0, @@ -72,11 +80,11 @@ func (b *writerPipe) Run(tctx *tcontext.Context) { if errOccurs { continue } - ObserveHistogram(receiveWriteChunkTimeHistogram, b.labels, time.Since(receiveChunkTime).Seconds()) + ObserveHistogram(b.metrics.receiveWriteChunkTimeHistogram, b.labels, time.Since(receiveChunkTime).Seconds()) receiveChunkTime = time.Now() err := writeBytes(tctx, b.w, s.Bytes()) - ObserveHistogram(writeTimeHistogram, b.labels, time.Since(receiveChunkTime).Seconds()) - AddGauge(finishedSizeGauge, b.labels, float64(s.Len())) + ObserveHistogram(b.metrics.writeTimeHistogram, b.labels, time.Since(receiveChunkTime).Seconds()) + AddGauge(b.metrics.finishedSizeGauge, b.labels, float64(s.Len())) b.finishedFileSize += uint64(s.Len()) s.Reset() pool.Put(s) @@ -134,7 +142,14 @@ func WriteMeta(tctx *tcontext.Context, meta MetaIR, w storage.ExternalFileWriter } // WriteInsert writes TableDataIR to a storage.ExternalFileWriter in sql type -func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, w storage.ExternalFileWriter) (n uint64, err error) { +func WriteInsert( + pCtx *tcontext.Context, + cfg *Config, + meta TableMeta, + tblIR TableDataIR, + w storage.ExternalFileWriter, + metrics *metrics, +) (n uint64, err error) { fileRowIter := tblIR.Rows() if !fileRowIter.HasNext() { return 0, fileRowIter.Error() @@ -145,7 +160,7 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl bf.Grow(lengthLimit - bfCap) } - wp := newWriterPipe(w, cfg.FileSize, cfg.StatementSize, cfg.Labels) + wp := newWriterPipe(w, cfg.FileSize, cfg.StatementSize, metrics, cfg.Labels) // use context.Background here to make sure writerPipe can deplete all the chunks in pipeline ctx, cancel := tcontext.Background().WithLogger(pCtx.L()).WithCancel() @@ -183,8 +198,8 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl zap.Uint64("finished rows", lastCounter), zap.Uint64("finished size", wp.finishedFileSize), log.ShortError(err)) - SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter)) - SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize)) + SubGauge(metrics.finishedRowsGauge, cfg.Labels, float64(lastCounter)) + SubGauge(metrics.finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize)) } else { pCtx.L().Debug("finish dumping table(chunk)", zap.String("database", meta.DatabaseName()), @@ -247,7 +262,7 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl if bfCap := bf.Cap(); bfCap < lengthLimit { bf.Grow(lengthLimit - bfCap) } - AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) + AddGauge(metrics.finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) lastCounter = counter } } @@ -265,7 +280,7 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl } close(wp.input) <-wp.closed - AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) + AddGauge(metrics.finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) lastCounter = counter if err = fileRowIter.Error(); err != nil { return counter, errors.Trace(err) @@ -274,7 +289,14 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl } // WriteInsertInCsv writes TableDataIR to a storage.ExternalFileWriter in csv type -func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, w storage.ExternalFileWriter) (n uint64, err error) { +func WriteInsertInCsv( + pCtx *tcontext.Context, + cfg *Config, + meta TableMeta, + tblIR TableDataIR, + w storage.ExternalFileWriter, + metrics *metrics, +) (n uint64, err error) { fileRowIter := tblIR.Rows() if !fileRowIter.HasNext() { return 0, fileRowIter.Error() @@ -285,7 +307,7 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR bf.Grow(lengthLimit - bfCap) } - wp := newWriterPipe(w, cfg.FileSize, UnspecifiedSize, cfg.Labels) + wp := newWriterPipe(w, cfg.FileSize, UnspecifiedSize, metrics, cfg.Labels) opt := &csvOption{ nullValue: cfg.CsvNullValue, separator: []byte(cfg.CsvSeparator), @@ -321,8 +343,8 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR zap.Uint64("finished rows", lastCounter), zap.Uint64("finished size", wp.finishedFileSize), log.ShortError(err)) - SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter)) - SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize)) + SubGauge(metrics.finishedRowsGauge, cfg.Labels, float64(lastCounter)) + SubGauge(metrics.finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize)) } else { pCtx.L().Debug("finish dumping table(chunk)", zap.String("database", meta.DatabaseName()), @@ -372,7 +394,7 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR if bfCap := bf.Cap(); bfCap < lengthLimit { bf.Grow(lengthLimit - bfCap) } - AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) + AddGauge(metrics.finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) lastCounter = counter } } @@ -388,7 +410,7 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR } close(wp.input) <-wp.closed - AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) + AddGauge(metrics.finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) lastCounter = counter if err = fileRowIter.Error(); err != nil { return counter, errors.Trace(err) @@ -620,12 +642,19 @@ func (f FileFormat) Extension() string { } // WriteInsert writes TableDataIR to a storage.ExternalFileWriter in sql/csv type -func (f FileFormat) WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, w storage.ExternalFileWriter) (uint64, error) { +func (f FileFormat) WriteInsert( + pCtx *tcontext.Context, + cfg *Config, + meta TableMeta, + tblIR TableDataIR, + w storage.ExternalFileWriter, + metrics *metrics, +) (uint64, error) { switch f { case FileFormatSQLText: - return WriteInsert(pCtx, cfg, meta, tblIR, w) + return WriteInsert(pCtx, cfg, meta, tblIR, w, metrics) case FileFormatCSV: - return WriteInsertInCsv(pCtx, cfg, meta, tblIR, w) + return WriteInsertInCsv(pCtx, cfg, meta, tblIR, w, metrics) default: return 0, errors.Errorf("unknown file format") } diff --git a/util/promutil/interface.go b/util/promutil/interface.go new file mode 100644 index 0000000000..94800b50cb --- /dev/null +++ b/util/promutil/interface.go @@ -0,0 +1,51 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promutil + +import "github.com/prometheus/client_golang/prometheus" + +// Factory is the interface to create some native prometheus metric +// copied from tiflow/engine/pkg/promutil/factory.go. +type Factory interface { + // NewCounter works like the function of the same name in the prometheus + // package, but it automatically registers the Counter with the Factory's + // Registerer. Panic if it can't register successfully. + NewCounter(opts prometheus.CounterOpts) prometheus.Counter + + // NewCounterVec works like the function of the same name in the + // prometheus, package but it automatically registers the CounterVec with + // the Factory's Registerer. Panic if it can't register successfully. + NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec + + // NewGauge works like the function of the same name in the prometheus + // package, but it automatically registers the Gauge with the Factory's + // Registerer. Panic if it can't register successfully. + NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge + + // NewGaugeVec works like the function of the same name in the prometheus + // package but it automatically registers the GaugeVec with the Factory's + // Registerer. Panic if it can't register successfully. + NewGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *prometheus.GaugeVec + + // NewHistogram works like the function of the same name in the prometheus + // package but it automatically registers the Histogram with the Factory's + // Registerer. Panic if it can't register successfully. + NewHistogram(opts prometheus.HistogramOpts) prometheus.Histogram + + // NewHistogramVec works like the function of the same name in the + // prometheus package but it automatically registers the HistogramVec + // with the Factory's Registerer. Panic if it can't register successfully. + NewHistogramVec(opts prometheus.HistogramOpts, labelNames []string) *prometheus.HistogramVec +} diff --git a/util/promutil/plain.go b/util/promutil/plain.go new file mode 100644 index 0000000000..1d63385390 --- /dev/null +++ b/util/promutil/plain.go @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promutil + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// PlainNoAutoRegisterFactory implements Factory. Note that it will not automatically +// Register the metrics created by its methods as the Factory interface said. +type PlainNoAutoRegisterFactory struct{} + +// NewCounter implements Factory.NewCounter. +func (f *PlainNoAutoRegisterFactory) NewCounter(opts prometheus.CounterOpts) prometheus.Counter { + return prometheus.NewCounter(opts) +} + +// NewCounterVec implements Factory.NewCounterVec. +func (f *PlainNoAutoRegisterFactory) NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec { + return prometheus.NewCounterVec(opts, labelNames) +} + +// NewGauge implements Factory.NewGauge. +func (f *PlainNoAutoRegisterFactory) NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge { + return prometheus.NewGauge(opts) +} + +// NewGaugeVec implements Factory.NewGaugeVec. +func (f *PlainNoAutoRegisterFactory) NewGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *prometheus.GaugeVec { + return prometheus.NewGaugeVec(opts, labelNames) +} + +// NewHistogram implements Factory.NewHistogram. +func (f *PlainNoAutoRegisterFactory) NewHistogram(opts prometheus.HistogramOpts) prometheus.Histogram { + return prometheus.NewHistogram(opts) +} + +// NewHistogramVec implements Factory.NewHistogramVec. +func (f *PlainNoAutoRegisterFactory) NewHistogramVec(opts prometheus.HistogramOpts, labelNames []string) *prometheus.HistogramVec { + return prometheus.NewHistogramVec(opts, labelNames) +}