lightning: enhance request log in server-mode and restore progress (#33718)
close pingcap/tidb#33715
This commit is contained in:
@ -1243,7 +1243,7 @@ loopWrite:
|
||||
engine.importedKVSize.Add(rangeStats.totalBytes)
|
||||
engine.importedKVCount.Add(rangeStats.count)
|
||||
engine.finishedRanges.add(finishedRange)
|
||||
metric.BytesCounter.WithLabelValues(metric.TableStateImported).Add(float64(rangeStats.totalBytes))
|
||||
metric.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -133,6 +133,50 @@ func (l *Lightning) GoServe() error {
|
||||
return l.goServe(statusAddr, io.Discard)
|
||||
}
|
||||
|
||||
// TODO: maybe handle http request using gin
|
||||
type loggingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
body string
|
||||
}
|
||||
|
||||
func newLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
|
||||
return &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) WriteHeader(code int) {
|
||||
lrw.statusCode = code
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) Write(d []byte) (int, error) {
|
||||
// keep first part of the response for logging, max 1K
|
||||
if lrw.body == "" && len(d) > 0 {
|
||||
length := len(d)
|
||||
if length > 1024 {
|
||||
length = 1024
|
||||
}
|
||||
lrw.body = string(d[:length])
|
||||
}
|
||||
return lrw.ResponseWriter.Write(d)
|
||||
}
|
||||
|
||||
func httpHandleWrapper(h http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
logger := log.L().With(zap.String("method", r.Method), zap.Stringer("url", r.URL)).
|
||||
Begin(zapcore.InfoLevel, "process http request")
|
||||
|
||||
newWriter := newLoggingResponseWriter(w)
|
||||
h.ServeHTTP(newWriter, r)
|
||||
|
||||
bodyField := zap.Skip()
|
||||
if newWriter.Header().Get("Content-Encoding") != "gzip" {
|
||||
bodyField = zap.String("body", newWriter.body)
|
||||
}
|
||||
logger.End(zapcore.InfoLevel, nil, zap.Int("status", newWriter.statusCode), bodyField)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/", http.RedirectHandler("/web/", http.StatusFound))
|
||||
@ -145,13 +189,13 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
|
||||
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||
|
||||
handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask))
|
||||
mux.Handle("/tasks", handleTasks)
|
||||
mux.Handle("/tasks/", handleTasks)
|
||||
mux.HandleFunc("/progress/task", handleProgressTask)
|
||||
mux.HandleFunc("/progress/table", handleProgressTable)
|
||||
mux.HandleFunc("/pause", handlePause)
|
||||
mux.HandleFunc("/resume", handleResume)
|
||||
mux.HandleFunc("/loglevel", handleLogLevel)
|
||||
mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP))
|
||||
mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP))
|
||||
mux.HandleFunc("/progress/task", httpHandleWrapper(handleProgressTask))
|
||||
mux.HandleFunc("/progress/table", httpHandleWrapper(handleProgressTable))
|
||||
mux.HandleFunc("/pause", httpHandleWrapper(handlePause))
|
||||
mux.HandleFunc("/resume", httpHandleWrapper(handleResume))
|
||||
mux.HandleFunc("/loglevel", httpHandleWrapper(handleLogLevel))
|
||||
|
||||
mux.Handle("/web/", http.StripPrefix("/web", httpgzip.FileServer(web.Res, httpgzip.FileServerOptions{
|
||||
IndexHTML: true,
|
||||
@ -215,7 +259,8 @@ func (l *Lightning) RunServer() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.run(context.Background(), task, nil)
|
||||
o := &options{}
|
||||
err = l.run(context.Background(), task, o)
|
||||
if err != nil && !common.IsContextCanceledError(err) {
|
||||
restore.DeliverPauser.Pause() // force pause the progress on error
|
||||
log.L().Error("tidb lightning encountered error", zap.Error(err))
|
||||
@ -559,7 +604,7 @@ func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) {
|
||||
writeJSONError(w, http.StatusBadRequest, "cannot read request", err)
|
||||
return
|
||||
}
|
||||
log.L().Debug("received task config", zap.ByteString("content", data))
|
||||
log.L().Info("received task config", zap.ByteString("content", data))
|
||||
|
||||
cfg := config.NewConfig()
|
||||
if err = cfg.LoadFromGlobal(l.globalCfg); err != nil {
|
||||
|
||||
@ -24,10 +24,18 @@ import (
|
||||
const (
|
||||
// states used for the TableCounter labels
|
||||
TableStatePending = "pending"
|
||||
TableStateWritten = "written"
|
||||
TableStateImported = "imported"
|
||||
TableStateCompleted = "completed"
|
||||
|
||||
BytesStateTotalRestore = "total_restore" // total source data bytes needs to restore
|
||||
BytesStateRestored = "restored" // source data bytes restored during restore engine
|
||||
BytesStateRestoreWritten = "written" // bytes written during restore engine
|
||||
BytesStateImported = "imported" // bytes imported during import engine
|
||||
|
||||
ProgressPhaseTotal = "total" // total restore progress(not include post-process, like checksum and analyze)
|
||||
ProgressPhaseRestore = "restore" // restore engine progress
|
||||
ProgressPhaseImport = "import" // import engine progress
|
||||
|
||||
// results used for the TableCounter labels
|
||||
TableResultSuccess = "success"
|
||||
TableResultFailure = "failure"
|
||||
@ -193,6 +201,14 @@ var (
|
||||
Help: "disk/memory size currently occupied by intermediate files in local backend",
|
||||
}, []string{"medium"},
|
||||
)
|
||||
|
||||
ProgressGauge = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "lightning",
|
||||
Name: "progress",
|
||||
Help: "progress of lightning phase",
|
||||
}, []string{"phase"},
|
||||
)
|
||||
)
|
||||
|
||||
//nolint:gochecknoinits // TODO: refactor
|
||||
@ -216,6 +232,7 @@ func init() {
|
||||
prometheus.MustRegister(ChunkParserReadBlockSecondsHistogram)
|
||||
prometheus.MustRegister(ApplyWorkerSecondsHistogram)
|
||||
prometheus.MustRegister(LocalStorageUsageBytesGauge)
|
||||
prometheus.MustRegister(ProgressGauge)
|
||||
}
|
||||
|
||||
func RecordTableCount(status string, err error) {
|
||||
|
||||
@ -1162,6 +1162,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
|
||||
case <-logProgressChan:
|
||||
// log the current progress periodically, so OPS will know that we're still working
|
||||
nanoseconds := float64(time.Since(start).Nanoseconds())
|
||||
totalRestoreBytes := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore))
|
||||
restoredBytes := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateRestored))
|
||||
// the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate
|
||||
// before the last table start, so use the bigger of the two should be a workaround
|
||||
estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated))
|
||||
@ -1179,8 +1181,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
|
||||
engineEstimated = enginePending
|
||||
}
|
||||
engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess))
|
||||
bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateWritten))
|
||||
bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateImported))
|
||||
bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten))
|
||||
bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateImported))
|
||||
|
||||
var state string
|
||||
var remaining zap.Field
|
||||
@ -1197,37 +1199,64 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
|
||||
state = "preparing"
|
||||
}
|
||||
|
||||
// since we can't accurately estimate the extra time cost by import after all writing are finished,
|
||||
// so here we use estimatedWritingProgress * 0.8 + estimatedImportingProgress * 0.2 as the total
|
||||
// progress.
|
||||
// lightning restore is separated into restore engine and import engine, they are both parallelized
|
||||
// and pipelined between engines, so we can only weight the progress of those 2 phase to get the
|
||||
// total progress.
|
||||
//
|
||||
// for local & importer backend:
|
||||
// in most case import engine is faster since there's little computations, but inside one engine
|
||||
// restore and import is serialized, the progress of those two will not differ too much, and
|
||||
// import engine determines the end time of the whole restore, so we average them for now.
|
||||
// the result progress may fall behind the real progress if import is faster.
|
||||
//
|
||||
// for tidb backend, we do nothing during import engine, so we use restore engine progress as the
|
||||
// total progress.
|
||||
restoreBytesField := zap.Skip()
|
||||
importBytesField := zap.Skip()
|
||||
remaining = zap.Skip()
|
||||
totalPercent := 0.0
|
||||
if finished > 0 {
|
||||
writePercent := math.Min(finished/estimated, 1.0)
|
||||
importPercent := 1.0
|
||||
if bytesWritten > 0 {
|
||||
totalBytes := bytesWritten / writePercent
|
||||
importPercent = math.Min(bytesImported/totalBytes, 1.0)
|
||||
if restoredBytes > 0 {
|
||||
restorePercent := math.Min(restoredBytes/totalRestoreBytes, 1.0)
|
||||
metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseRestore).Set(restorePercent)
|
||||
if rc.cfg.TikvImporter.Backend != config.BackendTiDB {
|
||||
var importPercent float64
|
||||
if bytesWritten > 0 {
|
||||
// estimate total import bytes from written bytes
|
||||
// when importPercent = 1, totalImportBytes = bytesWritten, but there's case
|
||||
// bytesImported may bigger or smaller than bytesWritten such as when deduplicate
|
||||
// we calculate progress using engines then use the bigger one in case bytesImported is
|
||||
// smaller.
|
||||
totalImportBytes := bytesWritten / restorePercent
|
||||
biggerPercent := math.Max(bytesImported/totalImportBytes, engineFinished/engineEstimated)
|
||||
importPercent = math.Min(biggerPercent, 1.0)
|
||||
importBytesField = zap.String("import-bytes", fmt.Sprintf("%s/%s(estimated)",
|
||||
units.BytesSize(bytesImported), units.BytesSize(totalImportBytes)))
|
||||
}
|
||||
metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseImport).Set(importPercent)
|
||||
totalPercent = (restorePercent + importPercent) / 2
|
||||
} else {
|
||||
totalPercent = restorePercent
|
||||
}
|
||||
totalPercent = writePercent*0.8 + importPercent*0.2
|
||||
if totalPercent < 1.0 {
|
||||
remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds
|
||||
remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second))
|
||||
}
|
||||
restoreBytesField = zap.String("restore-bytes", fmt.Sprintf("%s/%s",
|
||||
units.BytesSize(restoredBytes), units.BytesSize(totalRestoreBytes)))
|
||||
}
|
||||
metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseTotal).Set(totalPercent)
|
||||
|
||||
formatPercent := func(finish, estimate float64) string {
|
||||
speed := ""
|
||||
if estimated > 0 {
|
||||
speed = fmt.Sprintf(" (%.1f%%)", finish/estimate*100)
|
||||
formatPercent := func(num, denom float64) string {
|
||||
if denom > 0 {
|
||||
return fmt.Sprintf(" (%.1f%%)", num/denom*100)
|
||||
}
|
||||
return speed
|
||||
return ""
|
||||
}
|
||||
|
||||
// avoid output bytes speed if there are no unfinished chunks
|
||||
chunkSpeed := zap.Skip()
|
||||
encodeSpeedField := zap.Skip()
|
||||
if bytesRead > 0 {
|
||||
chunkSpeed = zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds))
|
||||
encodeSpeedField = zap.Float64("encode speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds))
|
||||
}
|
||||
|
||||
// Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour.
|
||||
@ -1237,7 +1266,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
|
||||
zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))),
|
||||
zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))),
|
||||
zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))),
|
||||
chunkSpeed,
|
||||
restoreBytesField, importBytesField,
|
||||
encodeSpeedField,
|
||||
zap.String("state", state),
|
||||
remaining,
|
||||
)
|
||||
@ -1447,6 +1477,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
|
||||
finalErr = err
|
||||
return
|
||||
}
|
||||
logTask.End(zap.ErrorLevel, nil)
|
||||
// clean up task metas
|
||||
if cleanup {
|
||||
logTask.Info("cleanup task metas")
|
||||
@ -1473,11 +1504,13 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
|
||||
for task := range taskCh {
|
||||
tableLogTask := task.tr.logger.Begin(zap.InfoLevel, "restore table")
|
||||
web.BroadcastTableCheckpoint(task.tr.tableName, task.cp)
|
||||
|
||||
needPostProcess, err := task.tr.restoreTable(ctx2, rc, task.cp)
|
||||
|
||||
err = common.NormalizeOrWrapErr(common.ErrRestoreTable, err, task.tr.tableName)
|
||||
tableLogTask.End(zap.ErrorLevel, err)
|
||||
web.BroadcastError(task.tr.tableName, err)
|
||||
metric.RecordTableCount("completed", err)
|
||||
metric.RecordTableCount(metric.TableStateCompleted, err)
|
||||
restoreErr.Set(err)
|
||||
if needPostProcess {
|
||||
postProcessTaskChan <- task
|
||||
@ -1487,6 +1520,8 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
|
||||
}()
|
||||
}
|
||||
|
||||
var allTasks []task
|
||||
var totalDataSizeToRestore int64
|
||||
for _, dbMeta := range rc.dbMetas {
|
||||
dbInfo := rc.dbInfos[dbMeta.Name]
|
||||
for _, tableMeta := range dbMeta.Tables {
|
||||
@ -1508,15 +1543,33 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
select {
|
||||
case taskCh <- task{tr: tr, cp: cp}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
allTasks = append(allTasks, task{tr: tr, cp: cp})
|
||||
|
||||
if len(cp.Engines) == 0 {
|
||||
for _, fi := range tableMeta.DataFiles {
|
||||
totalDataSizeToRestore += fi.FileMeta.FileSize
|
||||
}
|
||||
} else {
|
||||
for _, eng := range cp.Engines {
|
||||
for _, chunk := range eng.Chunks {
|
||||
totalDataSizeToRestore += chunk.Chunk.EndOffset - chunk.Chunk.Offset
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metric.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore).Add(float64(totalDataSizeToRestore))
|
||||
|
||||
for i := range allTasks {
|
||||
wg.Add(1)
|
||||
select {
|
||||
case taskCh <- allTasks[i]:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
// if context is done, should return directly
|
||||
select {
|
||||
@ -2154,7 +2207,8 @@ func (cr *chunkRestore) deliverLoop(
|
||||
var kvPacket []deliveredKVs
|
||||
// init these two field as checkpoint current value, so even if there are no kv pairs delivered,
|
||||
// chunk checkpoint should stay the same
|
||||
offset := cr.chunk.Chunk.Offset
|
||||
startOffset := cr.chunk.Chunk.Offset
|
||||
currOffset := startOffset
|
||||
rowID := cr.chunk.Chunk.PrevRowIDMax
|
||||
|
||||
populate:
|
||||
@ -2168,7 +2222,7 @@ func (cr *chunkRestore) deliverLoop(
|
||||
for _, p := range kvPacket {
|
||||
p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
|
||||
columns = p.columns
|
||||
offset = p.offset
|
||||
currOffset = p.offset
|
||||
rowID = p.rowID
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@ -2231,9 +2285,11 @@ func (cr *chunkRestore) deliverLoop(
|
||||
// In local mode, we should write these checkpoint after engine flushed.
|
||||
cr.chunk.Checksum.Add(&dataChecksum)
|
||||
cr.chunk.Checksum.Add(&indexChecksum)
|
||||
cr.chunk.Chunk.Offset = offset
|
||||
cr.chunk.Chunk.Offset = currOffset
|
||||
cr.chunk.Chunk.PrevRowIDMax = rowID
|
||||
|
||||
metric.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))
|
||||
|
||||
if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 {
|
||||
// No need to save checkpoint if nothing was delivered.
|
||||
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine)
|
||||
|
||||
@ -538,7 +538,7 @@ func (tr *TableRestore) restoreEngine(
|
||||
}
|
||||
if err == nil {
|
||||
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt)
|
||||
metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize()))
|
||||
metric.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize()))
|
||||
if dataFlushStatus != nil && indexFlushStaus != nil {
|
||||
if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() {
|
||||
saveCheckpoint(rc, tr, engineID, cr.chunk)
|
||||
|
||||
Reference in New Issue
Block a user