From fbb497f83a2abeee7aba67d01ac290697ad4d78b Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 18 Apr 2022 19:56:02 +0800 Subject: [PATCH] lightning: enhance request log in server-mode and restore progress (#33718) close pingcap/tidb#33715 --- br/pkg/lightning/backend/local/local.go | 2 +- br/pkg/lightning/lightning.go | 63 ++++++++++-- br/pkg/lightning/metric/metric.go | 19 +++- br/pkg/lightning/restore/restore.go | 114 ++++++++++++++++------ br/pkg/lightning/restore/table_restore.go | 2 +- 5 files changed, 159 insertions(+), 41 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index ae5d292d04..d813ca7252 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1243,7 +1243,7 @@ loopWrite: engine.importedKVSize.Add(rangeStats.totalBytes) engine.importedKVCount.Add(rangeStats.count) engine.finishedRanges.add(finishedRange) - metric.BytesCounter.WithLabelValues(metric.TableStateImported).Add(float64(rangeStats.totalBytes)) + metric.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes)) } return errors.Trace(err) } diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 4ad8622c8d..291d5ff6f3 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -133,6 +133,50 @@ func (l *Lightning) GoServe() error { return l.goServe(statusAddr, io.Discard) } +// TODO: maybe handle http request using gin +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int + body string +} + +func newLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter { + return &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} +} + +func (lrw *loggingResponseWriter) WriteHeader(code int) { + lrw.statusCode = code + lrw.ResponseWriter.WriteHeader(code) +} + +func (lrw *loggingResponseWriter) Write(d []byte) (int, error) { + // keep first part of the response for logging, max 1K + if lrw.body == "" && len(d) > 0 { + length := len(d) + if length > 1024 { + length = 1024 + } + lrw.body = string(d[:length]) + } + return lrw.ResponseWriter.Write(d) +} + +func httpHandleWrapper(h http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + logger := log.L().With(zap.String("method", r.Method), zap.Stringer("url", r.URL)). + Begin(zapcore.InfoLevel, "process http request") + + newWriter := newLoggingResponseWriter(w) + h.ServeHTTP(newWriter, r) + + bodyField := zap.Skip() + if newWriter.Header().Get("Content-Encoding") != "gzip" { + bodyField = zap.String("body", newWriter.body) + } + logger.End(zapcore.InfoLevel, nil, zap.Int("status", newWriter.statusCode), bodyField) + } +} + func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error { mux := http.NewServeMux() mux.Handle("/", http.RedirectHandler("/web/", http.StatusFound)) @@ -145,13 +189,13 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error { mux.HandleFunc("/debug/pprof/trace", pprof.Trace) handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask)) - mux.Handle("/tasks", handleTasks) - mux.Handle("/tasks/", handleTasks) - mux.HandleFunc("/progress/task", handleProgressTask) - mux.HandleFunc("/progress/table", handleProgressTable) - mux.HandleFunc("/pause", handlePause) - mux.HandleFunc("/resume", handleResume) - mux.HandleFunc("/loglevel", handleLogLevel) + mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP)) + mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP)) + mux.HandleFunc("/progress/task", httpHandleWrapper(handleProgressTask)) + mux.HandleFunc("/progress/table", httpHandleWrapper(handleProgressTable)) + mux.HandleFunc("/pause", httpHandleWrapper(handlePause)) + mux.HandleFunc("/resume", httpHandleWrapper(handleResume)) + mux.HandleFunc("/loglevel", httpHandleWrapper(handleLogLevel)) mux.Handle("/web/", http.StripPrefix("/web", httpgzip.FileServer(web.Res, httpgzip.FileServerOptions{ IndexHTML: true, @@ -215,7 +259,8 @@ func (l *Lightning) RunServer() error { if err != nil { return err } - err = l.run(context.Background(), task, nil) + o := &options{} + err = l.run(context.Background(), task, o) if err != nil && !common.IsContextCanceledError(err) { restore.DeliverPauser.Pause() // force pause the progress on error log.L().Error("tidb lightning encountered error", zap.Error(err)) @@ -559,7 +604,7 @@ func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) { writeJSONError(w, http.StatusBadRequest, "cannot read request", err) return } - log.L().Debug("received task config", zap.ByteString("content", data)) + log.L().Info("received task config", zap.ByteString("content", data)) cfg := config.NewConfig() if err = cfg.LoadFromGlobal(l.globalCfg); err != nil { diff --git a/br/pkg/lightning/metric/metric.go b/br/pkg/lightning/metric/metric.go index 984b14c846..6121ea6935 100644 --- a/br/pkg/lightning/metric/metric.go +++ b/br/pkg/lightning/metric/metric.go @@ -24,10 +24,18 @@ import ( const ( // states used for the TableCounter labels TableStatePending = "pending" - TableStateWritten = "written" TableStateImported = "imported" TableStateCompleted = "completed" + BytesStateTotalRestore = "total_restore" // total source data bytes needs to restore + BytesStateRestored = "restored" // source data bytes restored during restore engine + BytesStateRestoreWritten = "written" // bytes written during restore engine + BytesStateImported = "imported" // bytes imported during import engine + + ProgressPhaseTotal = "total" // total restore progress(not include post-process, like checksum and analyze) + ProgressPhaseRestore = "restore" // restore engine progress + ProgressPhaseImport = "import" // import engine progress + // results used for the TableCounter labels TableResultSuccess = "success" TableResultFailure = "failure" @@ -193,6 +201,14 @@ var ( Help: "disk/memory size currently occupied by intermediate files in local backend", }, []string{"medium"}, ) + + ProgressGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "lightning", + Name: "progress", + Help: "progress of lightning phase", + }, []string{"phase"}, + ) ) //nolint:gochecknoinits // TODO: refactor @@ -216,6 +232,7 @@ func init() { prometheus.MustRegister(ChunkParserReadBlockSecondsHistogram) prometheus.MustRegister(ApplyWorkerSecondsHistogram) prometheus.MustRegister(LocalStorageUsageBytesGauge) + prometheus.MustRegister(ProgressGauge) } func RecordTableCount(status string, err error) { diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index d181b0882f..a7c86b9949 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1162,6 +1162,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s case <-logProgressChan: // log the current progress periodically, so OPS will know that we're still working nanoseconds := float64(time.Since(start).Nanoseconds()) + totalRestoreBytes := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore)) + restoredBytes := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateRestored)) // the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate // before the last table start, so use the bigger of the two should be a workaround estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated)) @@ -1179,8 +1181,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s engineEstimated = enginePending } engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess)) - bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateWritten)) - bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateImported)) + bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten)) + bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateImported)) var state string var remaining zap.Field @@ -1197,37 +1199,64 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s state = "preparing" } - // since we can't accurately estimate the extra time cost by import after all writing are finished, - // so here we use estimatedWritingProgress * 0.8 + estimatedImportingProgress * 0.2 as the total - // progress. + // lightning restore is separated into restore engine and import engine, they are both parallelized + // and pipelined between engines, so we can only weight the progress of those 2 phase to get the + // total progress. + // + // for local & importer backend: + // in most case import engine is faster since there's little computations, but inside one engine + // restore and import is serialized, the progress of those two will not differ too much, and + // import engine determines the end time of the whole restore, so we average them for now. + // the result progress may fall behind the real progress if import is faster. + // + // for tidb backend, we do nothing during import engine, so we use restore engine progress as the + // total progress. + restoreBytesField := zap.Skip() + importBytesField := zap.Skip() remaining = zap.Skip() totalPercent := 0.0 - if finished > 0 { - writePercent := math.Min(finished/estimated, 1.0) - importPercent := 1.0 - if bytesWritten > 0 { - totalBytes := bytesWritten / writePercent - importPercent = math.Min(bytesImported/totalBytes, 1.0) + if restoredBytes > 0 { + restorePercent := math.Min(restoredBytes/totalRestoreBytes, 1.0) + metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseRestore).Set(restorePercent) + if rc.cfg.TikvImporter.Backend != config.BackendTiDB { + var importPercent float64 + if bytesWritten > 0 { + // estimate total import bytes from written bytes + // when importPercent = 1, totalImportBytes = bytesWritten, but there's case + // bytesImported may bigger or smaller than bytesWritten such as when deduplicate + // we calculate progress using engines then use the bigger one in case bytesImported is + // smaller. + totalImportBytes := bytesWritten / restorePercent + biggerPercent := math.Max(bytesImported/totalImportBytes, engineFinished/engineEstimated) + importPercent = math.Min(biggerPercent, 1.0) + importBytesField = zap.String("import-bytes", fmt.Sprintf("%s/%s(estimated)", + units.BytesSize(bytesImported), units.BytesSize(totalImportBytes))) + } + metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseImport).Set(importPercent) + totalPercent = (restorePercent + importPercent) / 2 + } else { + totalPercent = restorePercent } - totalPercent = writePercent*0.8 + importPercent*0.2 if totalPercent < 1.0 { remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second)) } + restoreBytesField = zap.String("restore-bytes", fmt.Sprintf("%s/%s", + units.BytesSize(restoredBytes), units.BytesSize(totalRestoreBytes))) } + metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseTotal).Set(totalPercent) - formatPercent := func(finish, estimate float64) string { - speed := "" - if estimated > 0 { - speed = fmt.Sprintf(" (%.1f%%)", finish/estimate*100) + formatPercent := func(num, denom float64) string { + if denom > 0 { + return fmt.Sprintf(" (%.1f%%)", num/denom*100) } - return speed + return "" } // avoid output bytes speed if there are no unfinished chunks - chunkSpeed := zap.Skip() + encodeSpeedField := zap.Skip() if bytesRead > 0 { - chunkSpeed = zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)) + encodeSpeedField = zap.Float64("encode speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)) } // Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour. @@ -1237,7 +1266,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))), zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))), zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))), - chunkSpeed, + restoreBytesField, importBytesField, + encodeSpeedField, zap.String("state", state), remaining, ) @@ -1447,6 +1477,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { finalErr = err return } + logTask.End(zap.ErrorLevel, nil) // clean up task metas if cleanup { logTask.Info("cleanup task metas") @@ -1473,11 +1504,13 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { for task := range taskCh { tableLogTask := task.tr.logger.Begin(zap.InfoLevel, "restore table") web.BroadcastTableCheckpoint(task.tr.tableName, task.cp) + needPostProcess, err := task.tr.restoreTable(ctx2, rc, task.cp) + err = common.NormalizeOrWrapErr(common.ErrRestoreTable, err, task.tr.tableName) tableLogTask.End(zap.ErrorLevel, err) web.BroadcastError(task.tr.tableName, err) - metric.RecordTableCount("completed", err) + metric.RecordTableCount(metric.TableStateCompleted, err) restoreErr.Set(err) if needPostProcess { postProcessTaskChan <- task @@ -1487,6 +1520,8 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { }() } + var allTasks []task + var totalDataSizeToRestore int64 for _, dbMeta := range rc.dbMetas { dbInfo := rc.dbInfos[dbMeta.Name] for _, tableMeta := range dbMeta.Tables { @@ -1508,15 +1543,33 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { return errors.Trace(err) } - wg.Add(1) - select { - case taskCh <- task{tr: tr, cp: cp}: - case <-ctx.Done(): - return ctx.Err() + allTasks = append(allTasks, task{tr: tr, cp: cp}) + + if len(cp.Engines) == 0 { + for _, fi := range tableMeta.DataFiles { + totalDataSizeToRestore += fi.FileMeta.FileSize + } + } else { + for _, eng := range cp.Engines { + for _, chunk := range eng.Chunks { + totalDataSizeToRestore += chunk.Chunk.EndOffset - chunk.Chunk.Offset + } + } } } } + metric.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore).Add(float64(totalDataSizeToRestore)) + + for i := range allTasks { + wg.Add(1) + select { + case taskCh <- allTasks[i]: + case <-ctx.Done(): + return ctx.Err() + } + } + wg.Wait() // if context is done, should return directly select { @@ -2154,7 +2207,8 @@ func (cr *chunkRestore) deliverLoop( var kvPacket []deliveredKVs // init these two field as checkpoint current value, so even if there are no kv pairs delivered, // chunk checkpoint should stay the same - offset := cr.chunk.Chunk.Offset + startOffset := cr.chunk.Chunk.Offset + currOffset := startOffset rowID := cr.chunk.Chunk.PrevRowIDMax populate: @@ -2168,7 +2222,7 @@ func (cr *chunkRestore) deliverLoop( for _, p := range kvPacket { p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) columns = p.columns - offset = p.offset + currOffset = p.offset rowID = p.rowID } case <-ctx.Done(): @@ -2231,9 +2285,11 @@ func (cr *chunkRestore) deliverLoop( // In local mode, we should write these checkpoint after engine flushed. cr.chunk.Checksum.Add(&dataChecksum) cr.chunk.Checksum.Add(&indexChecksum) - cr.chunk.Chunk.Offset = offset + cr.chunk.Chunk.Offset = currOffset cr.chunk.Chunk.PrevRowIDMax = rowID + metric.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset)) + if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 { // No need to save checkpoint if nothing was delivered. dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 9c3048d951..27410428ed 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -538,7 +538,7 @@ func (tr *TableRestore) restoreEngine( } if err == nil { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) - metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize())) + metric.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize())) if dataFlushStatus != nil && indexFlushStaus != nil { if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() { saveCheckpoint(rc, tr, engineID, cr.chunk)