// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package executor import ( "context" "fmt" "math" "net/url" "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/infoschema" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/sqlexec" "github.com/prometheus/client_golang/api" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" pmodel "github.com/prometheus/common/model" ) const promReadTimeout = time.Second * 10 // MetricRetriever uses to read metric data. type MetricRetriever struct { dummyCloser table *model.TableInfo tblDef *infoschema.MetricTableDef extractor *plannercore.MetricTableExtractor retrieved bool } func (e *MetricRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if e.retrieved || e.extractor.SkipRequest { return nil, nil } e.retrieved = true tblDef, err := infoschema.GetMetricTableDef(e.table.Name.L) if err != nil { return nil, err } e.tblDef = tblDef queryRange := e.getQueryRange(sctx) totalRows := make([][]types.Datum, 0) quantiles := e.extractor.Quantiles if len(quantiles) == 0 { quantiles = []float64{tblDef.Quantile} } for _, quantile := range quantiles { var queryValue pmodel.Value // Add retry to avoid network error. for i := 0; i < 10; i++ { queryValue, err = e.queryMetric(ctx, sctx, queryRange, quantile) if err == nil || strings.Contains(err.Error(), "parse error") { break } time.Sleep(100 * time.Millisecond) } if err != nil { return nil, err } partRows := e.genRows(queryValue, queryRange, quantile) totalRows = append(totalRows, partRows...) } return totalRows, nil } func (e *MetricRetriever) queryMetric(ctx context.Context, sctx sessionctx.Context, queryRange promv1.Range, quantile float64) (pmodel.Value, error) { failpoint.InjectContext(ctx, "mockMetricRetrieverQueryPromQL", func() { failpoint.Return(ctx.Value("__mockMetricsData").(pmodel.Matrix), nil) }) addr, err := e.getMetricAddr(sctx) if err != nil { return nil, err } queryClient, err := newQueryClient(addr) if err != nil { return nil, err } promQLAPI := promv1.NewAPI(queryClient) ctx, cancel := context.WithTimeout(ctx, promReadTimeout) defer cancel() promQL := e.tblDef.GenPromQL(sctx, e.extractor.LabelConditions, quantile) result, _, err := promQLAPI.QueryRange(ctx, promQL, queryRange) return result, err } func (e *MetricRetriever) getMetricAddr(sctx sessionctx.Context) (string, error) { // Get PD servers info. store := sctx.GetStore() etcd, ok := store.(tikv.EtcdBackend) if !ok { return "", errors.Errorf("%T not an etcd backend", store) } for _, addr := range etcd.EtcdAddrs() { return addr, nil } return "", errors.Errorf("pd address was not found") } type promQLQueryRange = promv1.Range func (e *MetricRetriever) getQueryRange(sctx sessionctx.Context) promQLQueryRange { startTime, endTime := e.extractor.StartTime, e.extractor.EndTime step := time.Second * time.Duration(sctx.GetSessionVars().MetricSchemaStep) return promQLQueryRange{Start: startTime, End: endTime, Step: step} } func (e *MetricRetriever) genRows(value pmodel.Value, r promQLQueryRange, quantile float64) [][]types.Datum { var rows [][]types.Datum switch value.Type() { case pmodel.ValMatrix: matrix := value.(pmodel.Matrix) for _, m := range matrix { for _, v := range m.Values { record := e.genRecord(m.Metric, v, r, quantile) rows = append(rows, record) } } } return rows } func (e *MetricRetriever) genRecord(metric pmodel.Metric, pair pmodel.SamplePair, r promQLQueryRange, quantile float64) []types.Datum { record := make([]types.Datum, 0, 2+len(e.tblDef.Labels)+1) // Record order should keep same with genColumnInfos. record = append(record, types.NewTimeDatum(types.NewTime( types.FromGoTime(time.Unix(int64(pair.Timestamp/1000), int64(pair.Timestamp%1000)*1e6)), mysql.TypeDatetime, types.MaxFsp, ))) if math.IsNaN(float64(pair.Value)) { record = append(record, types.NewDatum(nil)) } else { record = append(record, types.NewFloat64Datum(float64(pair.Value))) } for _, label := range e.tblDef.Labels { v := "" if metric != nil { v = string(metric[pmodel.LabelName(label)]) } if len(v) == 0 { v = infoschema.GenLabelConditionValues(e.extractor.LabelConditions[strings.ToLower(label)]) } record = append(record, types.NewStringDatum(v)) } if e.tblDef.Quantile > 0 { record = append(record, types.NewFloat64Datum(quantile)) } return record } type queryClient struct { api.Client } func newQueryClient(addr string) (api.Client, error) { promClient, err := api.NewClient(api.Config{ Address: fmt.Sprintf("http://%s", addr), }) if err != nil { return nil, err } return &queryClient{ promClient, }, nil } // URL implement the api.Client interface. // This is use to convert prometheus api path to PD API path. func (c *queryClient) URL(ep string, args map[string]string) *url.URL { // FIXME: add `PD-Allow-follower-handle: true` in http header, let pd follower can handle this request too. ep = strings.Replace(ep, "api/v1", "pd/api/v1/metric", 1) return c.Client.URL(ep, args) } // MetricSummaryRetriever uses to read metric data. type MetricSummaryRetriever struct { dummyCloser table *model.TableInfo extractor *plannercore.MetricTableExtractor retrieved bool } func (e *MetricSummaryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if e.retrieved || e.extractor.SkipRequest { return nil, nil } e.retrieved = true totalRows := make([][]types.Datum, 0, len(infoschema.MetricTableMap)) quantiles := []float64{1, 0.999, 0.99, 0.90, 0.80} tps := make([]*types.FieldType, 0, len(e.table.Columns)) for _, col := range e.table.Columns { tps = append(tps, &col.FieldType) } startTime := e.extractor.StartTime.Format(plannercore.MetricTableTimeFormat) endTime := e.extractor.EndTime.Format(plannercore.MetricTableTimeFormat) for name, def := range infoschema.MetricTableMap { sqls := e.genMetricQuerySQLS(name, startTime, endTime, def.Quantile, quantiles) for _, sql := range sqls { rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) if err != nil { return nil, errors.Trace(err) } for _, row := range rows { totalRows = append(totalRows, row.GetDatumRow(tps)) } } } return totalRows, nil } func (e *MetricSummaryRetriever) genMetricQuerySQLS(name, startTime, endTime string, quantile float64, quantiles []float64) []string { if quantile == 0 { sql := fmt.Sprintf(`select "%s",min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%s where time > '%s' and time < '%s'`, name, name, startTime, endTime) return []string{sql} } sqls := []string{} for _, quantile := range quantiles { sql := fmt.Sprintf(`select "%s_%v",min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%s where time > '%s' and time < '%s' and quantile=%v`, name, quantile, name, startTime, endTime, quantile) sqls = append(sqls, sql) } return sqls }