Files
tidb/pkg/domain/extract.go

531 lines
14 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package domain
import (
"archive/zip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/metadef"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
const (
// ExtractMetaFile indicates meta file for extract
ExtractMetaFile = "extract_meta.txt"
)
const (
// ExtractTaskType indicates type of extract task
ExtractTaskType = "taskType"
// ExtractPlanTaskSkipStats indicates skip stats for extract plan task
ExtractPlanTaskSkipStats = "SkipStats"
)
// ExtractType indicates type
type ExtractType uint8
const (
// ExtractPlanType indicates extract plan task
ExtractPlanType ExtractType = iota
)
func taskTypeToString(t ExtractType) string {
if t == ExtractPlanType {
return "Plan"
}
return "Unknown"
}
// ExtractHandle handles the extractWorker to run extract the information task like Plan or any others.
// extractHandle will provide 2 mode for extractWorker:
// 1. submit a background extract task, the response will be returned after the task is started to be solved
// 2. submit a task and wait until the task is solved, the result will be returned to the response.
type ExtractHandle struct {
worker *extractWorker
}
// newExtractHandler new extract handler
func newExtractHandler(ctx context.Context, sctxs []sessionctx.Context) *ExtractHandle {
h := &ExtractHandle{}
h.worker = newExtractWorker(ctx, sctxs[0], false)
return h
}
// ExtractTask extract tasks
func (h *ExtractHandle) ExtractTask(ctx context.Context, task *ExtractTask) (string, error) {
// TODO: support background job later
if task.IsBackgroundJob {
return "", nil
}
return h.worker.extractTask(ctx, task)
}
type extractWorker struct {
ctx context.Context
sctx sessionctx.Context
isBackgroundWorker bool
sync.Mutex
}
// ExtractTask indicates task
type ExtractTask struct {
ExtractType ExtractType
IsBackgroundJob bool
// Param for Extract Plan
SkipStats bool
UseHistoryView bool
// variables for plan task type
Begin time.Time
End time.Time
}
// NewExtractPlanTask returns extract plan task
func NewExtractPlanTask(begin, end time.Time) *ExtractTask {
return &ExtractTask{
Begin: begin,
End: end,
ExtractType: ExtractPlanType,
}
}
func newExtractWorker(
ctx context.Context,
sctx sessionctx.Context,
isBackgroundWorker bool,
) *extractWorker {
return &extractWorker{
ctx: ctx,
sctx: sctx,
isBackgroundWorker: isBackgroundWorker,
}
}
func (w *extractWorker) extractTask(ctx context.Context, task *ExtractTask) (string, error) {
if task.ExtractType == ExtractPlanType {
return w.extractPlanTask(ctx, task)
}
return "", errors.New("unknown extract task")
}
func (w *extractWorker) extractPlanTask(ctx context.Context, task *ExtractTask) (string, error) {
if task.UseHistoryView && !config.GetGlobalConfig().Instance.StmtSummaryEnablePersistent {
return "", errors.New("tidb_stmt_summary_enable_persistent should be enabled for extract task")
}
records, err := w.collectRecords(ctx, task)
if err != nil {
logutil.BgLogger().Error("collect stmt summary records failed for extract plan task", zap.Error(err))
return "", err
}
p, err := w.packageExtractPlanRecords(ctx, records)
if err != nil {
logutil.BgLogger().Error("package stmt summary records failed for extract plan task", zap.Error(err))
return "", err
}
return w.dumpExtractPlanPackage(task, p)
}
func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, error) {
w.Lock()
defer w.Unlock()
exec := w.sctx.GetRestrictedSQLExecutor()
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
sourceTable := "STATEMENTS_SUMMARY_HISTORY"
if !task.UseHistoryView {
sourceTable = "STATEMENTS_SUMMARY"
}
rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN, TABLE_NAMES, SAMPLE_USER FROM INFORMATION_SCHEMA.%s WHERE SUMMARY_END_TIME > '%s' AND SUMMARY_BEGIN_TIME < '%s'",
sourceTable, task.Begin.Format(types.TimeFormat), task.End.Format(types.TimeFormat)))
if err != nil {
return nil, err
}
collectMap := make(map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, 0)
for _, row := range rows {
record := &stmtSummaryHistoryRecord{}
record.stmtType = row.GetString(0)
record.digest = row.GetString(1)
record.planDigest = row.GetString(2)
record.sql = row.GetString(3)
record.binaryPlan = row.GetString(4)
tableNames := row.GetString(5)
key := stmtSummaryHistoryKey{
digest: record.digest,
planDigest: record.planDigest,
}
record.userName = row.GetString(6)
record.tables = make([]tableNamePair, 0)
setRecord, err := w.handleTableNames(tableNames, record)
if err != nil {
return nil, err
}
if setRecord && checkRecordValid(record) {
collectMap[key] = record
}
}
return collectMap, nil
}
func (w *extractWorker) handleTableNames(tableNames string, record *stmtSummaryHistoryRecord) (bool, error) {
is := GetDomain(w.sctx).InfoSchema()
for _, t := range strings.Split(tableNames, ",") {
names := strings.Split(t, ".")
if len(names) != 2 {
return false, nil
}
dbName := names[0]
tblName := names[1]
record.schemaName = dbName
// skip internal schema record
switch strings.ToLower(record.schemaName) {
case metadef.PerformanceSchemaName.L, metadef.InformationSchemaName.L, metadef.MetricSchemaName.L, "mysql":
return false, nil
}
exists := is.TableExists(ast.NewCIStr(dbName), ast.NewCIStr(tblName))
if !exists {
return false, nil
}
t, err := is.TableByName(w.ctx, ast.NewCIStr(dbName), ast.NewCIStr(tblName))
if err != nil {
return false, err
}
record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()})
}
return true, nil
}
func checkRecordValid(r *stmtSummaryHistoryRecord) bool {
if r.stmtType != "Select" {
return false
}
if r.schemaName == "" {
return false
}
if r.planDigest == "" {
return false
}
return true
}
func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord) (*extractPlanPackage, error) {
p := &extractPlanPackage{}
p.records = records
p.tables = make(map[tableNamePair]struct{}, 0)
for _, record := range records {
// skip the sql which has been cut off
if strings.Contains(record.sql, "(len:") {
record.skip = true
continue
}
plan, err := w.decodeBinaryPlan(ctx, record.binaryPlan)
if err != nil {
return nil, err
}
record.plan = plan
for _, tbl := range record.tables {
p.tables[tbl] = struct{}{}
}
}
if err := w.handleIsView(ctx, p); err != nil {
return nil, err
}
return p, nil
}
func (w *extractWorker) handleIsView(ctx context.Context, p *extractPlanPackage) error {
is := GetDomain(w.sctx).InfoSchema()
tne := &tableNameExtractor{
ctx: ctx,
executor: w.sctx.GetRestrictedSQLExecutor(),
is: is,
curDB: ast.NewCIStr(""),
names: make(map[tableNamePair]struct{}),
cteNames: make(map[string]struct{}),
}
for v := range p.tables {
if v.IsView {
v, err := is.TableByName(w.ctx, ast.NewCIStr(v.DBName), ast.NewCIStr(v.TableName))
if err != nil {
return err
}
sql := v.Meta().View.SelectStmt
node, err := tne.executor.ParseWithParams(tne.ctx, sql)
if err != nil {
return err
}
node.Accept(tne)
}
}
if tne.err != nil {
return tne.err
}
r, err := tne.getTablesAndViews()
if err != nil {
return err
}
for t := range r {
p.tables[t] = struct{}{}
}
return nil
}
func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (string, error) {
exec := w.sctx.GetRestrictedSQLExecutor()
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT tidb_decode_binary_plan('%s')", bPlan))
if err != nil {
return "", err
}
plan := rows[0].GetString(0)
return strings.Trim(plan, "\n"), nil
}
// dumpExtractPlanPackage will dump the information about sqls collected in stmt_summary_history
// The files will be organized into the following format:
/*
|-extract_meta.txt
|-meta.txt
|-config.toml
|-variables.toml
|-bindings.sql
|-schema
| |-schema_meta.txt
| |-db1.table1.schema.txt
| |-db2.table2.schema.txt
| |-....
|-view
| |-db1.view1.view.txt
| |-db2.view2.view.txt
| |-....
|-stats
| |-stats1.json
| |-stats2.json
| |-....
|-table_tiflash_replica.txt
|-sql
| |-digest1.sql
| |-digest2.sql
| |-....
|-skippedSQLs
| |-digest1.sql
| |-...
*/
func (w *extractWorker) dumpExtractPlanPackage(task *ExtractTask, p *extractPlanPackage) (name string, err error) {
f, name, err := GenerateExtractFile()
if err != nil {
return "", err
}
zw := zip.NewWriter(f)
defer func() {
if err != nil {
logutil.BgLogger().Error("dump extract plan task failed", zap.Error(err))
}
if err1 := zw.Close(); err1 != nil {
logutil.BgLogger().Warn("close zip file failed", zap.String("file", name), zap.Error(err))
}
if err1 := f.Close(); err1 != nil {
logutil.BgLogger().Warn("close file failed", zap.String("file", name), zap.Error(err))
}
}()
// Dump config
if err = dumpConfig(zw); err != nil {
return "", err
}
// Dump meta
if err = dumpMeta(zw); err != nil {
return "", err
}
// dump extract plan task meta
if err = dumpExtractMeta(task, zw); err != nil {
return "", err
}
// Dump Schema and View
if err = dumpSchemas(w.sctx, zw, p.tables); err != nil {
return "", err
}
// Dump tables tiflash replicas
if err = dumpTiFlashReplica(w.sctx, zw, p.tables); err != nil {
return "", err
}
// Dump variables
if err = dumpVariables(w.sctx, w.sctx.GetSessionVars(), zw); err != nil {
return "", err
}
// Dump global bindings
if err = dumpGlobalBindings(w.sctx, zw); err != nil {
return "", err
}
// Dump stats
if !task.SkipStats {
if _, err = dumpStats(zw, p.tables, GetDomain(w.sctx), 0); err != nil {
return "", err
}
}
// Dump sqls and plan
if err = dumpSQLRecords(p.records, zw); err != nil {
return "", err
}
return name, nil
}
func dumpSQLRecords(records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, zw *zip.Writer) error {
for key, record := range records {
if record.skip {
err := dumpSQLRecord(record, fmt.Sprintf("skippedSQLs/%v.json", key.digest), zw)
if err != nil {
return err
}
} else {
err := dumpSQLRecord(record, fmt.Sprintf("SQLs/%v.json", key.digest), zw)
if err != nil {
return err
}
}
}
return nil
}
type singleSQLRecord struct {
Schema string `json:"schema"`
Plan string `json:"plan"`
SQL string `json:"sql"`
Digest string `json:"digest"`
BinaryPlan string `json:"binaryPlan"`
UserName string `json:"userName"`
}
// dumpSQLRecord dumps sql records into one file for each record, the format is in json.
func dumpSQLRecord(record *stmtSummaryHistoryRecord, path string, zw *zip.Writer) error {
zf, err := zw.Create(path)
if err != nil {
return err
}
singleSQLRecord := &singleSQLRecord{
Schema: record.schemaName,
Plan: record.plan,
SQL: record.sql,
Digest: record.digest,
BinaryPlan: record.binaryPlan,
UserName: record.userName,
}
content, err := json.Marshal(singleSQLRecord)
if err != nil {
return err
}
_, err = zf.Write(content)
if err != nil {
return err
}
return nil
}
func dumpExtractMeta(task *ExtractTask, zw *zip.Writer) error {
cf, err := zw.Create(ExtractMetaFile)
if err != nil {
return errors.AddStack(err)
}
varMap := make(map[string]string)
varMap[ExtractTaskType] = taskTypeToString(task.ExtractType)
if task.ExtractType == ExtractPlanType {
varMap[ExtractPlanTaskSkipStats] = strconv.FormatBool(task.SkipStats)
}
if err := toml.NewEncoder(cf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
return nil
}
type extractPlanPackage struct {
tables map[tableNamePair]struct{}
records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord
}
type stmtSummaryHistoryKey struct {
digest string
planDigest string
}
type stmtSummaryHistoryRecord struct {
stmtType string
schemaName string
tables []tableNamePair
digest string
planDigest string
sql string
binaryPlan string
userName string
plan string
skip bool
}
// GenerateExtractFile generates extract stmt file
func GenerateExtractFile() (*os.File, string, error) {
path := GetExtractTaskDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generateExtractStmtFile()
if err != nil {
return nil, "", errors.AddStack(err)
}
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, err
}
func generateExtractStmtFile() (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err := rand.Read(b)
if err != nil {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
return fmt.Sprintf("extract_%v_%v.zip", key, time), nil
}
// GetExtractTaskDirName get extract dir name
func GetExtractTaskDirName() string {
tidbLogDir := filepath.Dir(config.GetGlobalConfig().Log.File.Filename)
return filepath.Join(tidbLogDir, "extract")
}