Files
tidb/executor/cluster_reader.go

218 lines
5.7 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"strings"
"sync"
"github.com/jeremywohl/flatten"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/pdapi"
)
type clusterRetriever interface {
retrieve(ctx sessionctx.Context) ([][]types.Datum, error)
}
// ClusterReaderExec executes cluster information retrieving from the cluster components
type ClusterReaderExec struct {
baseExecutor
retrieved bool
retriever clusterRetriever
}
// Next implements the Executor Next interface.
func (e *ClusterReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.retrieved {
req.Reset()
return nil
}
rows, err := e.retriever.retrieve(e.ctx)
if err != nil {
return err
}
e.retrieved = true
if len(rows) == 0 {
req.Reset()
return nil
}
req.GrowAndReset(len(rows))
mutableRow := chunk.MutRowFromTypes(retTypes(e))
for _, row := range rows {
mutableRow.SetDatums(row...)
req.AppendRow(mutableRow.ToRow())
}
return nil
}
type clusterConfigRetriever struct {
extractor *plannercore.ClusterConfigTableExtractor
}
// retrieve implements the clusterRetriever interface
func (e *clusterConfigRetriever) retrieve(ctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest {
return nil, nil
}
type result struct {
idx int
rows [][]types.Datum
err error
}
serversInfo, err := infoschema.GetClusterServerInfo(ctx)
failpoint.Inject("mockClusterConfigServerInfo", func(val failpoint.Value) {
if s := val.(string); len(s) > 0 {
serversInfo = serversInfo[:0]
servers := strings.Split(s, ";")
for _, server := range servers {
parts := strings.Split(server, ",")
serversInfo = append(serversInfo, infoschema.ServerInfo{
ServerType: parts[0],
Address: parts[1],
StatusAddr: parts[2],
})
}
// erase the error
err = nil
}
})
if err != nil {
return nil, err
}
var finalRows [][]types.Datum
wg := sync.WaitGroup{}
ch := make(chan result, len(serversInfo))
for i, srv := range serversInfo {
typ := srv.ServerType
// Skip some node type which has been filtered in WHERE cluase
// e.g: SELECT * FROM cluster_config WHERE type='tikv'
if len(e.extractor.NodeTypes) > 0 && !e.extractor.NodeTypes.Exist(typ) {
continue
}
address := srv.Address
// Skip some node address which has been filtered in WHERE cluase
// e.g: SELECT * FROM cluster_config WHERE address='192.16.8.12:2379'
if len(e.extractor.Addresses) > 0 && !e.extractor.Addresses.Exist(address) {
continue
}
statusAddr := srv.StatusAddr
if len(statusAddr) == 0 {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s does not contain status address", typ, address))
continue
}
wg.Add(1)
go func(index int) {
util.WithRecovery(func() {
defer wg.Done()
var url string
switch typ {
case "pd":
url = fmt.Sprintf("http://%s%s", statusAddr, pdapi.Config)
case "tikv", "tidb":
url = fmt.Sprintf("http://%s/config", statusAddr)
default:
ch <- result{err: errors.Errorf("unknown node type: %s(%s)", typ, address)}
return
}
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
ch <- result{err: errors.Trace(err)}
return
}
req.Header.Add("PD-Allow-follower-handle", "true")
resp, err := http.DefaultClient.Do(req)
if err != nil {
ch <- result{err: errors.Trace(err)}
return
}
defer func() {
terror.Log(resp.Body.Close())
}()
if resp.StatusCode != http.StatusOK {
ch <- result{err: errors.Errorf("request %s failed: %s", url, resp.Status)}
return
}
var nested map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&nested); err != nil {
ch <- result{err: errors.Trace(err)}
return
}
data, err := flatten.Flatten(nested, "", flatten.DotStyle)
if err != nil {
ch <- result{err: errors.Trace(err)}
return
}
// Sorts by keys and make the result stable
type item struct {
key string
val string
}
var items []item
for key, val := range data {
items = append(items, item{key: key, val: fmt.Sprintf("%v", val)})
}
sort.Slice(items, func(i, j int) bool { return items[i].key < items[j].key })
var rows [][]types.Datum
for _, item := range items {
rows = append(rows, types.MakeDatums(
typ,
address,
item.key,
item.val,
))
}
ch <- result{idx: index, rows: rows}
}, nil)
}(i)
}
wg.Wait()
close(ch)
// Keep the original order to make the result more stable
var results []result
for result := range ch {
if result.err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
continue
}
results = append(results, result)
}
sort.Slice(results, func(i, j int) bool { return results[i].idx < results[j].idx })
for _, result := range results {
finalRows = append(finalRows, result.rows...)
}
return finalRows, nil
}