// Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "archive/zip" "bytes" "context" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "io" "net" "net/http" "net/http/pprof" "net/url" "os" "path/filepath" "runtime" "runtime/coverage" rpprof "runtime/pprof" "strconv" "strings" "sync" "time" "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/fn" pb "github.com/pingcap/kvproto/pkg/autoid" autoid "github.com/pingcap/tidb/pkg/autoid_service" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/server/handler" "github.com/pingcap/tidb/pkg/server/handler/optimizor" "github.com/pingcap/tidb/pkg/server/handler/tikvhandler" "github.com/pingcap/tidb/pkg/server/handler/ttlhandler" util2 "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/statistics/handle/initstats" "github.com/pingcap/tidb/pkg/store" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/cpuprofile" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/printer" "github.com/pingcap/tidb/pkg/util/traceevent" "github.com/pingcap/tidb/pkg/util/versioninfo" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/soheilhy/cmux" "github.com/tiancaiamao/appdash/traceapp" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/channelz/service" static "sourcegraph.com/sourcegraph/appdash-data" ) const defaultStatusPort = 10080 func (s *Server) startStatusHTTP() error { err := s.initHTTPListener() if err != nil { return err } go s.startHTTPServer() return nil } func serveError(w http.ResponseWriter, status int, txt string) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Go-Pprof", "1") w.Header().Del("Content-Disposition") w.WriteHeader(status) _, err := fmt.Fprintln(w, txt) terror.Log(err) } func sleepWithCtx(ctx context.Context, d time.Duration) { select { case <-time.After(d): case <-ctx.Done(): } } func (s *Server) listenStatusHTTPServer() error { s.statusAddr = net.JoinHostPort(s.cfg.Status.StatusHost, strconv.Itoa(int(s.cfg.Status.StatusPort))) if s.cfg.Status.StatusPort == 0 && !RunInGoTest { s.statusAddr = net.JoinHostPort(s.cfg.Status.StatusHost, strconv.Itoa(defaultStatusPort)) } logutil.BgLogger().Info("for status and metrics report", zap.String("listening on addr", s.statusAddr)) clusterSecurity := s.cfg.Security.ClusterSecurity() tlsConfig, err := clusterSecurity.ToTLSConfig() if err != nil { logutil.BgLogger().Error("invalid TLS config", zap.Error(err)) return errors.Trace(err) } tlsConfig = s.SetCNChecker(tlsConfig) if tlsConfig != nil { // The protocols should be listed as the same order we dispatch the connection with cmux. tlsConfig.NextProtos = []string{"http/1.1", "h2"} // we need to manage TLS here for cmux to distinguish between HTTP and gRPC. s.statusListener, err = tls.Listen("tcp", s.statusAddr, tlsConfig) } else { s.statusListener, err = net.Listen("tcp", s.statusAddr) } if err != nil { logutil.BgLogger().Info("listen failed", zap.Error(err)) return errors.Trace(err) } else if RunInGoTest && s.cfg.Status.StatusPort == 0 { s.statusAddr = s.statusListener.Addr().String() s.cfg.Status.StatusPort = uint(s.statusListener.Addr().(*net.TCPAddr).Port) } return nil } // Ballast try to reduce the GC frequency by using Ballast Object type Ballast struct { ballast []byte ballastLock sync.Mutex maxSize int } func newBallast(maxSize int) *Ballast { var b Ballast b.maxSize = 1024 * 1024 * 1024 * 2 if maxSize > 0 { b.maxSize = maxSize } else { // we try to use the total amount of ram as a reference to set the default ballastMaxSz // since the fatal throw "runtime: out of memory" would never yield to `recover` totalRAMSz, err := memory.MemTotal() if err != nil { logutil.BgLogger().Error("failed to get the total amount of RAM on this system", zap.Error(err)) } else { maxSzAdvice := totalRAMSz >> 2 if uint64(b.maxSize) > maxSzAdvice { b.maxSize = int(maxSzAdvice) } } } return &b } // GetSize get the size of ballast object func (b *Ballast) GetSize() int { var sz int b.ballastLock.Lock() sz = len(b.ballast) b.ballastLock.Unlock() return sz } // SetSize set the size of ballast object func (b *Ballast) SetSize(newSz int) error { if newSz < 0 { return fmt.Errorf("newSz cannot be negative: %d", newSz) } if newSz > b.maxSize { return fmt.Errorf("newSz cannot be bigger than %d but it has value %d", b.maxSize, newSz) } b.ballastLock.Lock() b.ballast = make([]byte, newSz) b.ballastLock.Unlock() return nil } // GenHTTPHandler generate a HTTP handler to get/set the size of this ballast object func (b *Ballast) GenHTTPHandler() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: _, err := w.Write([]byte(strconv.Itoa(b.GetSize()))) terror.Log(err) case http.MethodPost: body, err := io.ReadAll(r.Body) if err != nil { terror.Log(err) return } newSz, err := strconv.Atoi(string(body)) if err == nil { err = b.SetSize(newSz) } if err != nil { w.WriteHeader(http.StatusBadRequest) errStr := err.Error() if _, err := w.Write([]byte(errStr)); err != nil { terror.Log(err) } return } } } } func (s *Server) startHTTPServer() { router := mux.NewRouter() router.HandleFunc("/status", s.handleStatus).Name("Status") // HTTP path for prometheus. router.Handle("/metrics", promhttp.Handler()).Name("Metrics") // HTTP path for dump statistics. router.Handle("/stats/dump/{db}/{table}", s.newStatsHandler()). Name("StatsDump") router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()). Name("StatsHistoryDump") router.Handle("/stats/priority-queue", s.newStatsPriorityQueueHandler()). Name("StatsPriorityQueue") router.Handle("/plan_replayer/dump/{filename}", s.newPlanReplayerHandler()).Name("PlanReplayerDump") router.Handle("/extract_task/dump", s.newExtractServeHandler()).Name("ExtractTaskDump") router.Handle("/optimize_trace/dump/{filename}", s.newOptimizeTraceHandler()).Name("OptimizeTraceDump") tikvHandlerTool := s.NewTikvHandlerTool() router.Handle("/settings", tikvhandler.NewSettingsHandler(tikvHandlerTool)).Name("Settings") router.Handle("/schema", tikvhandler.NewSchemaHandler(tikvHandlerTool)).Name("Schema") router.Handle("/schema/{db}", tikvhandler.NewSchemaHandler(tikvHandlerTool)) router.Handle("/schema/{db}/{table}", tikvhandler.NewSchemaHandler(tikvHandlerTool)) router.Handle("/tables/{colID}/{colTp}/{colFlag}/{colLen}", tikvhandler.ValueHandler{}) router.Handle("/schema_storage", tikvhandler.NewSchemaStorageHandler(tikvHandlerTool)).Name("Schema Storage") router.Handle("/schema_storage/{db}", tikvhandler.NewSchemaStorageHandler(tikvHandlerTool)) router.Handle("/schema_storage/{db}/{table}", tikvhandler.NewSchemaStorageHandler(tikvHandlerTool)) router.Handle("/ddl/history", tikvhandler.NewDDLHistoryJobHandler(tikvHandlerTool)).Name("DDL_History") router.Handle("/ddl/owner/resign", tikvhandler.NewDDLResignOwnerHandler(tikvHandlerTool.Store.(kv.Storage))).Name("DDL_Owner_Resign") if kv.IsSystemKS(tikvHandlerTool.Store) { router.Handle("/dxf/schedule/status", tikvhandler.NewDXFScheduleStatusHandler()).Name("DXF_Schedule_Status") router.Handle("/dxf/schedule", tikvhandler.NewDXFScheduleHandler()).Name("DXF_Schedule") router.Handle("/dxf/schedule/tune", tikvhandler.NewDXFScheduleTuneHandler(tikvHandlerTool.Store.(kv.Storage))).Name("DXF_Schedule_Tune") router.Handle("/dxf/task/{taskID}/max_runtime_slots", tikvhandler.NewDXFTaskMaxRuntimeSlotsHandler()).Name("DXF_Task_Max_Runtime_Slots") } // HTTP path for transaction GC states. router.Handle("/txn-gc-states", tikvhandler.NewTxnGCStatesHandler(tikvHandlerTool.Store)) // HTTP path for get the TiDB config router.Handle("/config", fn.Wrap(func() (*config.Config, error) { return config.GetGlobalConfig(), nil })) router.Handle("/labels", tikvhandler.LabelHandler{}).Name("Labels") // HTTP path for get server info. router.Handle("/info", tikvhandler.NewServerInfoHandler(tikvHandlerTool)).Name("Info") router.Handle("/info/all", tikvhandler.NewAllServerInfoHandler(tikvHandlerTool)).Name("InfoALL") // HTTP path for get db and table info that is related to the tableID. router.Handle("/db-table/{tableID}", tikvhandler.NewDBTableHandler(tikvHandlerTool)) // HTTP path for get table tiflash replica info. router.Handle("/tiflash/replica-deprecated", tikvhandler.NewFlashReplicaHandler(tikvHandlerTool)) // HTTP path for upgrade operations. router.Handle("/upgrade/{op}", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations") // HTTP path for ingest configurations router.Handle("/ingest/max-batch-split-ranges", tikvhandler.NewIngestConcurrencyHandler( tikvHandlerTool, tikvhandler.IngestParamMaxBatchSplitRanges)).Name("IngestMaxBatchSplitRanges") router.Handle("/ingest/max-split-ranges-per-sec", tikvhandler.NewIngestConcurrencyHandler( tikvHandlerTool, tikvhandler.IngestParamMaxSplitRangesPerSec)).Name("IngestMaxSplitRangesPerSec") router.Handle("/ingest/max-ingest-inflight", tikvhandler.NewIngestConcurrencyHandler( tikvHandlerTool, tikvhandler.IngestParamMaxInflight)).Name("IngestMaxInflight") router.Handle("/ingest/max-ingest-per-sec", tikvhandler.NewIngestConcurrencyHandler( tikvHandlerTool, tikvhandler.IngestParamMaxPerSecond)).Name("IngestMaxPerSec") if s.cfg.Store == config.StoreTypeTiKV { // HTTP path for tikv. router.Handle("/tables/{db}/{table}/regions", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRegions)) router.Handle("/tables/{db}/{table}/ranges", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRanges)) router.Handle("/tables/{db}/{table}/scatter", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableScatter)) router.Handle("/tables/{db}/{table}/stop-scatter", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpStopTableScatter)) router.Handle("/tables/{db}/{table}/disk-usage", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableDiskUsage)) router.Handle("/regions/meta", tikvhandler.NewRegionHandler(tikvHandlerTool)).Name("RegionsMeta") router.Handle("/regions/hot", tikvhandler.NewRegionHandler(tikvHandlerTool)).Name("RegionHot") router.Handle("/regions/{regionID}", tikvhandler.NewRegionHandler(tikvHandlerTool)) } // HTTP path for get MVCC info router.Handle("/mvcc/key/{db}/{table}", tikvhandler.NewMvccTxnHandler(tikvHandlerTool, tikvhandler.OpMvccGetByKey)) router.Handle("/mvcc/key/{db}/{table}/{handle}", tikvhandler.NewMvccTxnHandler(tikvHandlerTool, tikvhandler.OpMvccGetByKey)) router.Handle("/mvcc/txn/{startTS}/{db}/{table}", tikvhandler.NewMvccTxnHandler(tikvHandlerTool, tikvhandler.OpMvccGetByTxn)) router.Handle("/mvcc/hex/{hexKey}", tikvhandler.NewMvccTxnHandler(tikvHandlerTool, tikvhandler.OpMvccGetByHex)) router.Handle("/mvcc/index/{db}/{table}/{index}", tikvhandler.NewMvccTxnHandler(tikvHandlerTool, tikvhandler.OpMvccGetByIdx)) router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", tikvhandler.NewMvccTxnHandler(tikvHandlerTool, tikvhandler.OpMvccGetByIdx)) // HTTP path for generate metric profile. router.Handle("/metrics/profile", tikvhandler.NewProfileHandler(tikvHandlerTool)) // HTTP path for web UI. if host, port, err := net.SplitHostPort(s.statusAddr); err == nil { if host == "" { host = "localhost" } baseURL := &url.URL{ Scheme: util.InternalHTTPSchema(), Host: net.JoinHostPort(host, port), } router.HandleFunc("/web/trace", traceapp.HandleTiDB).Name("Trace Viewer") sr := router.PathPrefix("/web/trace/").Subrouter() if _, err := traceapp.New(traceapp.NewRouter(sr), baseURL); err != nil { logutil.BgLogger().Error("new failed", zap.Error(err)) } router.PathPrefix("/static/").Handler(http.StripPrefix("/static", http.FileServer(static.Data))) } if s.StandbyController != nil { path, handler := s.StandbyController.Handler(s) router.PathPrefix(path).Handler(handler) } router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) router.HandleFunc("/debug/pprof/profile", cpuprofile.ProfileHTTPHandler) router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) router.HandleFunc("/debug/pprof/trace", pprof.Trace) // Other /debug/pprof paths not covered above are redirected to pprof.Index. router.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index) router.HandleFunc("/debug/traceevent", traceeventHandler) router.HandleFunc("/covdata", func(writer http.ResponseWriter, _ *http.Request) { writer.Header().Set("Content-Type", "application/zip") writer.Header().Set("Content-Disposition", "attachment; filename=files.zip") dir := os.Getenv("TIDB_GOCOVERDIR") if dir == "" { serveError(writer, http.StatusInternalServerError, "TIDB_GOCOVERDIR is not set") return } err := coverage.WriteMetaDir(dir) if err != nil { logutil.BgLogger().Warn("write coverage meta failed", zap.Error(err)) serveError(writer, http.StatusInternalServerError, "write coverage meta failed") return } err = coverage.WriteCountersDir(dir) if err != nil { logutil.BgLogger().Warn("write coverage counters failed", zap.Error(err)) serveError(writer, http.StatusInternalServerError, "write coverage counters failed") return } zipWriter := zip.NewWriter(writer) err = filepath.Walk(dir, func(file string, fi os.FileInfo, err error) error { if err != nil { return err } if fi.IsDir() { return nil } relPath, err := filepath.Rel(dir, file) if err != nil { return err } writer, err := zipWriter.Create(relPath) if err != nil { return err } srcFile, err := os.Open(filepath.Clean(file)) if err != nil { return err } defer func() { _ = srcFile.Close() }() _, err = io.Copy(writer, srcFile) return err }) if err != nil { logutil.BgLogger().Warn("zip coverage files failed", zap.Error(err)) serveError(writer, http.StatusInternalServerError, "zip coverage files failed") return } err = zipWriter.Close() terror.Log(err) }) ballast := newBallast(s.cfg.MaxBallastObjectSize) { err := ballast.SetSize(s.cfg.BallastObjectSize) if err != nil { logutil.BgLogger().Error("set initial ballast object size failed", zap.Error(err)) } } router.HandleFunc("/debug/ballast-object-sz", ballast.GenHTTPHandler()) router.HandleFunc("/debug/gogc", func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: _, err := w.Write([]byte(strconv.Itoa(util.GetGOGC()))) terror.Log(err) case http.MethodPost: body, err := io.ReadAll(r.Body) if err != nil { terror.Log(err) return } val, err := strconv.Atoi(string(body)) if err != nil { w.WriteHeader(http.StatusBadRequest) if _, err := w.Write([]byte(err.Error())); err != nil { terror.Log(err) } return } util.SetGOGC(val) } }) router.HandleFunc("/debug/zip", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Disposition", `attachment; filename="tidb_debug"`+time.Now().Format("20060102150405")+".zip") // dump goroutine/heap/mutex items := []struct { name string gc int debug int second int }{ {name: "goroutine", debug: 2}, {name: "heap", gc: 1}, {name: "mutex"}, } zw := zip.NewWriter(w) for _, item := range items { p := rpprof.Lookup(item.name) if p == nil { serveError(w, http.StatusNotFound, "Unknown profile") return } if item.gc > 0 { runtime.GC() } fw, err := zw.Create(item.name) if err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", item.name, err)) return } err = p.WriteTo(fw, item.debug) terror.Log(err) } // dump profile fw, err := zw.Create("profile") if err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", "profile", err)) return } pc := cpuprofile.NewCollector() if err := pc.StartCPUProfile(fw); err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Could not enable CPU profiling: %s", err)) return } sec, err := strconv.ParseInt(r.FormValue("seconds"), 10, 64) if sec <= 0 || err != nil { sec = 10 } sleepWithCtx(r.Context(), time.Duration(sec)*time.Second) err = pc.StopCPUProfile() if err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Could not enable CPU profiling: %s", err)) return } // dump config fw, err = zw.Create("config") if err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", "config", err)) return } js, err := json.MarshalIndent(config.GetGlobalConfig(), "", " ") if err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("get config info fail%v", err)) return } _, err = fw.Write(js) terror.Log(err) // dump version fw, err = zw.Create("version") if err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", "version", err)) return } _, err = fw.Write([]byte(printer.GetTiDBInfo())) terror.Log(err) err = zw.Close() terror.Log(err) }) // failpoint is enabled only for tests so we can add some http APIs here for tests. failpoint.Inject("enableTestAPI", func() { router.PathPrefix("/fail/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail") new(failpoint.HttpHandler).ServeHTTP(w, r) }) router.Handle("/test/{mod}/{op}", tikvhandler.NewTestHandler(tikvHandlerTool, 0)) }) // ddlHook is enabled only for tests so we can substitute the callback in the DDL. router.Handle("/test/ddl/hook", tikvhandler.DDLHookHandler{}) // ttlJobTriggerHandler is enabled only for tests, so we can accelerate the schedule of TTL job router.Handle("/test/ttl/trigger/{db}/{table}", ttlhandler.NewTTLJobTriggerHandler(tikvHandlerTool.Store.(kv.Storage))) var ( httpRouterPage bytes.Buffer pathTemplate string err error ) httpRouterPage.WriteString("TiDB Status and Metrics Report

TiDB Status and Metrics Report

") err = router.Walk(func(route *mux.Route, _ *mux.Router, _ []*mux.Route) error { pathTemplate, err = route.GetPathTemplate() if err != nil { logutil.BgLogger().Error("get HTTP router path failed", zap.Error(err)) } name := route.GetName() // If the name attribute is not set, GetName returns "". // "traceapp.xxx" are introduced by the traceapp package and are also ignored. if name != "" && !strings.HasPrefix(name, "traceapp") && err == nil { httpRouterPage.WriteString("") } return nil }) if err != nil { logutil.BgLogger().Error("generate root failed", zap.Error(err)) } httpRouterPage.WriteString("") httpRouterPage.WriteString("
" + name + "
Debug
") router.HandleFunc("/", func(responseWriter http.ResponseWriter, _ *http.Request) { _, err = responseWriter.Write(httpRouterPage.Bytes()) if err != nil { logutil.BgLogger().Error("write HTTP index page failed", zap.Error(err)) } }) serverMux := http.NewServeMux() serverMux.Handle("/", router) s.startStatusServerAndRPCServer(serverMux) } // setupAutoIDService initializes and registers the AutoID service for TiKV stores func (s *Server) setupAutoIDService(grpcServer *grpc.Server) { keyspaceName := config.GetGlobalKeyspaceName() var fullPath string if keyspaceName == "" { fullPath = fmt.Sprintf("%s://%s", s.cfg.Store, s.cfg.Path) } else { fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", s.cfg.Store, s.cfg.Path, keyspaceName) } store, err := store.New(fullPath) if err != nil { logutil.BgLogger().Error("new tikv store fail", zap.Error(err)) return } ebd, ok := store.(kv.EtcdBackend) if !ok { return } etcdAddr, err := ebd.EtcdAddrs() if err != nil { logutil.BgLogger().Error("tikv store not etcd background", zap.Error(err)) return } selfAddr := net.JoinHostPort(s.cfg.AdvertiseAddress, strconv.Itoa(int(s.cfg.Status.StatusPort))) service := autoid.New(selfAddr, etcdAddr, store, ebd.TLSConfig()) logutil.BgLogger().Info("register auto service at", zap.String("addr", selfAddr)) pb.RegisterAutoIDAllocServer(grpcServer, service) s.autoIDService = service } func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) { m := cmux.New(s.statusListener) // Match connections in order: // First HTTP, and otherwise grpc. httpL := m.Match(cmux.HTTP1Fast()) grpcL := m.Match(cmux.Any()) statusServer := &http.Server{Addr: s.statusAddr, Handler: util2.NewCorsHandler(serverMux, s.cfg)} grpcServer := NewRPCServer(s.cfg, s.dom, s) service.RegisterChannelzServiceToServer(grpcServer) if s.cfg.Store == config.StoreTypeTiKV { s.setupAutoIDService(grpcServer) } s.statusServer.Store(statusServer) s.grpcServer = grpcServer go util.WithRecovery(func() { err := grpcServer.Serve(grpcL) logutil.BgLogger().Warn("grpc server error", zap.Error(err)) }, nil) go util.WithRecovery(func() { err := statusServer.Serve(httpL) logutil.BgLogger().Warn("http server error", zap.Error(err)) }, nil) err := m.Serve() if err != nil { logutil.BgLogger().Warn("start status/rpc server error", zap.Error(err)) } } // SetCNChecker set the CN checker for server. func (s *Server) SetCNChecker(tlsConfig *tls.Config) *tls.Config { if tlsConfig != nil && len(s.cfg.Security.ClusterVerifyCN) != 0 { checkCN := make(map[string]struct{}) for _, cn := range s.cfg.Security.ClusterVerifyCN { cn = strings.TrimSpace(cn) checkCN[cn] = struct{}{} } tlsConfig.VerifyPeerCertificate = func(_ [][]byte, verifiedChains [][]*x509.Certificate) error { for _, chain := range verifiedChains { if len(chain) != 0 { if _, match := checkCN[chain[0].Subject.CommonName]; match { return nil } } } return errors.Errorf("client certificate authentication failed. The Common Name from the client certificate was not found in the configuration cluster-verify-cn with value: %s", s.cfg.Security.ClusterVerifyCN) } tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert } return tlsConfig } // Status of TiDB. type Status struct { Connections int `json:"connections"` Version string `json:"version"` GitHash string `json:"git_hash"` Status DetailStatus `json:"status"` } // DetailStatus is to show the detail status of TiDB. for example the init stats percentage. type DetailStatus struct { InitStatsPercentage float64 `json:"init_stats_percentage"` } func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") // If the server is in the process of shutting down, return a non-200 status. // It is important not to return Status{} as acquiring the s.ConnectionCount() // acquires a lock that may already be held by the shutdown process. if !s.health.Load() { w.WriteHeader(http.StatusInternalServerError) return } initStatsPercentage := min(100, initstats.InitStatsPercentage.Load()) st := Status{ Connections: s.ConnectionCount(), Version: mysql.ServerVersion, GitHash: versioninfo.TiDBGitHash, Status: DetailStatus{ InitStatsPercentage: initStatsPercentage, }, } js, err := json.Marshal(st) if err != nil { w.WriteHeader(http.StatusInternalServerError) logutil.BgLogger().Error("encode json failed", zap.Error(err)) return } _, err = w.Write(js) terror.Log(errors.Trace(err)) } func (s *Server) newStatsHandler() *optimizor.StatsHandler { store, ok := s.driver.(*TiDBDriver) if !ok { panic("Illegal driver") } do, err := session.GetDomain(store.store) if err != nil { panic("Failed to get domain") } return optimizor.NewStatsHandler(do) } func (s *Server) newStatsHistoryHandler() *optimizor.StatsHistoryHandler { store, ok := s.driver.(*TiDBDriver) if !ok { panic("Illegal driver") } do, err := session.GetDomain(store.store) if err != nil { panic("Failed to get domain") } return optimizor.NewStatsHistoryHandler(do) } func (s *Server) newStatsPriorityQueueHandler() *optimizor.StatsPriorityQueueHandler { store, ok := s.driver.(*TiDBDriver) if !ok { panic("Illegal driver") } do, err := session.GetDomain(store.store) if err != nil { panic("Failed to get domain") } return optimizor.NewStatsPriorityQueueHandler(do) } var traceeventCounter = make(chan struct{}, 1) func traceeventHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) return } select { case traceeventCounter <- struct{}{}: default: http.Error(w, "http api /debug/traceevent only allow one request at a time", http.StatusTooManyRequests) return } defer func() { <-traceeventCounter }() var cfg traceevent.FlightRecorderConfig cfg.Initialize() body, _ := io.ReadAll(r.Body) if len(body) > 0 { if err := json.Unmarshal(body, &cfg); err != nil { http.Error(w, fmt.Sprintf("decode json failed: %s", err.Error()), http.StatusBadRequest) return } } logutil.BgLogger().Info("http api /debug/traceevent called", zap.Any("config", cfg)) ch := make(chan []traceevent.Event, 256) recorder, err := traceevent.StartHTTPFlightRecorder(ch, &cfg) if err != nil { http.Error(w, fmt.Sprintf("validate config failed: %s", err.Error()), http.StatusBadRequest) return } defer recorder.Close() for { select { case events := <-ch: res := traceevent.ConvertEventsForRendering(events) fmt.Fprintf(w, "data: ") enc := json.NewEncoder(w) err := enc.Encode(res) if err != nil { logutil.BgLogger().Info("http api /debug/traceevent encode fail", zap.Error(err)) } fmt.Fprintf(w, "\n\n") flusher.Flush() case <-r.Context().Done(): logutil.BgLogger().Info("http api /debug/traceevent client disconnected") return } } }