Files
tidb/executor/slow_query.go
2020-02-18 18:54:05 +08:00

468 lines
15 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"bufio"
"context"
"io"
"os"
"strconv"
"strings"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/plancodec"
"go.uber.org/zap"
)
//SlowQueryRetriever is used to read slow log data.
type SlowQueryRetriever struct {
table *model.TableInfo
outputCols []*model.ColumnInfo
retrieved bool
initialized bool
file *os.File
fileLine int
}
func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved {
return nil, nil
}
if !e.initialized {
var err error
e.file, err = os.Open(sctx.GetSessionVars().SlowQueryFile)
if err != nil {
return nil, err
}
e.initialized = true
}
rows, err := e.dataForSlowLog(sctx)
if err != nil {
return nil, err
}
if len(e.outputCols) == len(e.table.Columns) {
return rows, nil
}
retRows := make([][]types.Datum, len(rows))
for i, fullRow := range rows {
row := make([]types.Datum, len(e.outputCols))
for j, col := range e.outputCols {
row[j] = fullRow[col.Offset]
}
retRows[i] = row
}
return retRows, nil
}
func (e *SlowQueryRetriever) close() error {
if e.file != nil {
err := e.file.Close()
if err != nil {
logutil.BgLogger().Error("close slow log file failed.", zap.Error(err))
}
}
return nil
}
func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}
user := ctx.GetSessionVars().User
checkValid := func(userName string) bool {
if !hasProcessPriv && user != nil && userName != user.Username {
return false
}
return true
}
rows, fileLine, err := ParseSlowLog(ctx, bufio.NewReader(e.file), e.fileLine, 1024, checkValid)
if err != nil {
if err == io.EOF {
e.retrieved = true
} else {
return nil, err
}
}
e.fileLine = fileLine
if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) {
return infoschema.AppendHostInfoToRows(rows)
}
return rows, nil
}
type checkValidFunc func(string) bool
// ParseSlowLog exports for testing.
// TODO: optimize for parse huge log-file.
func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow int, checkValid checkValidFunc) ([][]types.Datum, int, error) {
var rows [][]types.Datum
startFlag := false
lineNum := fileLine
tz := ctx.GetSessionVars().Location()
var st *slowQueryTuple
for {
if len(rows) >= maxRow {
return rows, lineNum, nil
}
lineNum++
lineByte, err := getOneLine(reader)
if err != nil {
return rows, lineNum, err
}
line := string(hack.String(lineByte))
// Check slow log entry start flag.
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, checkValid)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if valid {
startFlag = true
}
continue
}
if startFlag {
// Parse slow log field.
if strings.HasPrefix(line, variable.SlowLogRowPrefixStr) {
line = line[len(variable.SlowLogRowPrefixStr):]
if strings.HasPrefix(line, variable.SlowLogPrevStmtPrefix) {
st.prevStmt = line[len(variable.SlowLogPrevStmtPrefix):]
} else {
fieldValues := strings.Split(line, " ")
for i := 0; i < len(fieldValues)-1; i += 2 {
field := fieldValues[i]
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, checkValid)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if !valid {
startFlag = false
}
}
}
} else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
// Get the sql string, and mark the start flag to false.
_, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, checkValid)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if checkValid == nil || checkValid(st.user) {
rows = append(rows, st.convertToDatumRow())
}
startFlag = false
} else {
startFlag = false
}
}
}
}
func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return resByte, err
}
var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
resByte = append(resByte, tempLine...)
// Use the max value of max_allowed_packet to check the single line length.
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return resByte, err
}
}
return resByte, err
}
type slowQueryTuple struct {
time time.Time
txnStartTs uint64
user string
host string
connID uint64
queryTime float64
parseTime float64
compileTime float64
preWriteTime float64
binlogPrewriteTime float64
commitTime float64
getCommitTSTime float64
commitBackoffTime float64
backoffTypes string
resolveLockTime float64
localLatchWaitTime float64
writeKeys uint64
writeSize uint64
prewriteRegion uint64
txnRetry uint64
processTime float64
waitTime float64
backOffTime float64
lockKeysTime float64
requestCount uint64
totalKeys uint64
processKeys uint64
db string
indexIDs string
digest string
statsInfo string
avgProcessTime float64
p90ProcessTime float64
maxProcessTime float64
maxProcessAddress string
avgWaitTime float64
p90WaitTime float64
maxWaitTime float64
maxWaitAddress string
memMax int64
prevStmt string
sql string
isInternal bool
succ bool
plan string
planDigest string
}
func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, lineNum int, checkValid checkValidFunc) (valid bool, err error) {
valid = true
switch field {
case variable.SlowLogTimeStr:
st.time, err = ParseTime(value)
if err != nil {
break
}
if st.time.Location() != tz {
st.time = st.time.In(tz)
}
case variable.SlowLogTxnStartTSStr:
st.txnStartTs, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogUserStr:
fields := strings.SplitN(value, "@", 2)
if len(field) > 0 {
st.user = fields[0]
}
if len(field) > 1 {
st.host = fields[1]
}
if checkValid != nil {
valid = checkValid(st.user)
}
case variable.SlowLogConnIDStr:
st.connID, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogQueryTimeStr:
st.queryTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogParseTimeStr:
st.parseTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCompileTimeStr:
st.compileTime, err = strconv.ParseFloat(value, 64)
case execdetails.PreWriteTimeStr:
st.preWriteTime, err = strconv.ParseFloat(value, 64)
case execdetails.BinlogPrewriteTimeStr:
st.binlogPrewriteTime, err = strconv.ParseFloat(value, 64)
case execdetails.CommitTimeStr:
st.commitTime, err = strconv.ParseFloat(value, 64)
case execdetails.GetCommitTSTimeStr:
st.getCommitTSTime, err = strconv.ParseFloat(value, 64)
case execdetails.CommitBackoffTimeStr:
st.commitBackoffTime, err = strconv.ParseFloat(value, 64)
case execdetails.BackoffTypesStr:
st.backoffTypes = value
case execdetails.ResolveLockTimeStr:
st.resolveLockTime, err = strconv.ParseFloat(value, 64)
case execdetails.LocalLatchWaitTimeStr:
st.localLatchWaitTime, err = strconv.ParseFloat(value, 64)
case execdetails.WriteKeysStr:
st.writeKeys, err = strconv.ParseUint(value, 10, 64)
case execdetails.WriteSizeStr:
st.writeSize, err = strconv.ParseUint(value, 10, 64)
case execdetails.PrewriteRegionStr:
st.prewriteRegion, err = strconv.ParseUint(value, 10, 64)
case execdetails.TxnRetryStr:
st.txnRetry, err = strconv.ParseUint(value, 10, 64)
case execdetails.ProcessTimeStr:
st.processTime, err = strconv.ParseFloat(value, 64)
case execdetails.WaitTimeStr:
st.waitTime, err = strconv.ParseFloat(value, 64)
case execdetails.BackoffTimeStr:
st.backOffTime, err = strconv.ParseFloat(value, 64)
case execdetails.LockKeysTimeStr:
st.lockKeysTime, err = strconv.ParseFloat(value, 64)
case execdetails.RequestCountStr:
st.requestCount, err = strconv.ParseUint(value, 10, 64)
case execdetails.TotalKeysStr:
st.totalKeys, err = strconv.ParseUint(value, 10, 64)
case execdetails.ProcessKeysStr:
st.processKeys, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogDBStr:
st.db = value
case variable.SlowLogIndexNamesStr:
st.indexIDs = value
case variable.SlowLogIsInternalStr:
st.isInternal = value == "true"
case variable.SlowLogDigestStr:
st.digest = value
case variable.SlowLogStatsInfoStr:
st.statsInfo = value
case variable.SlowLogCopProcAvg:
st.avgProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcP90:
st.p90ProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcMax:
st.maxProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcAddr:
st.maxProcessAddress = value
case variable.SlowLogCopWaitAvg:
st.avgWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitP90:
st.p90WaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitMax:
st.maxWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitAddr:
st.maxWaitAddress = value
case variable.SlowLogMemMax:
st.memMax, err = strconv.ParseInt(value, 10, 64)
case variable.SlowLogSucc:
st.succ, err = strconv.ParseBool(value)
case variable.SlowLogPlan:
st.plan = value
case variable.SlowLogPlanDigest:
st.planDigest = value
case variable.SlowLogQuerySQLStr:
st.sql = value
}
if err != nil {
return valid, errors.Wrap(err, "Parse slow log at line "+strconv.FormatInt(int64(lineNum), 10)+" failed. Field: `"+field+"`, error")
}
return valid, err
}
func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record := make([]types.Datum, 0, 64)
record = append(record, types.NewTimeDatum(types.NewTime(types.FromGoTime(st.time), mysql.TypeDatetime, types.MaxFsp)))
record = append(record, types.NewUintDatum(st.txnStartTs))
record = append(record, types.NewStringDatum(st.user))
record = append(record, types.NewStringDatum(st.host))
record = append(record, types.NewUintDatum(st.connID))
record = append(record, types.NewFloat64Datum(st.queryTime))
record = append(record, types.NewFloat64Datum(st.parseTime))
record = append(record, types.NewFloat64Datum(st.compileTime))
record = append(record, types.NewFloat64Datum(st.preWriteTime))
record = append(record, types.NewFloat64Datum(st.binlogPrewriteTime))
record = append(record, types.NewFloat64Datum(st.commitTime))
record = append(record, types.NewFloat64Datum(st.getCommitTSTime))
record = append(record, types.NewFloat64Datum(st.commitBackoffTime))
record = append(record, types.NewStringDatum(st.backoffTypes))
record = append(record, types.NewFloat64Datum(st.resolveLockTime))
record = append(record, types.NewFloat64Datum(st.localLatchWaitTime))
record = append(record, types.NewUintDatum(st.writeKeys))
record = append(record, types.NewUintDatum(st.writeSize))
record = append(record, types.NewUintDatum(st.prewriteRegion))
record = append(record, types.NewUintDatum(st.txnRetry))
record = append(record, types.NewFloat64Datum(st.processTime))
record = append(record, types.NewFloat64Datum(st.waitTime))
record = append(record, types.NewFloat64Datum(st.backOffTime))
record = append(record, types.NewFloat64Datum(st.lockKeysTime))
record = append(record, types.NewUintDatum(st.requestCount))
record = append(record, types.NewUintDatum(st.totalKeys))
record = append(record, types.NewUintDatum(st.processKeys))
record = append(record, types.NewStringDatum(st.db))
record = append(record, types.NewStringDatum(st.indexIDs))
record = append(record, types.NewDatum(st.isInternal))
record = append(record, types.NewStringDatum(st.digest))
record = append(record, types.NewStringDatum(st.statsInfo))
record = append(record, types.NewFloat64Datum(st.avgProcessTime))
record = append(record, types.NewFloat64Datum(st.p90ProcessTime))
record = append(record, types.NewFloat64Datum(st.maxProcessTime))
record = append(record, types.NewStringDatum(st.maxProcessAddress))
record = append(record, types.NewFloat64Datum(st.avgWaitTime))
record = append(record, types.NewFloat64Datum(st.p90WaitTime))
record = append(record, types.NewFloat64Datum(st.maxWaitTime))
record = append(record, types.NewStringDatum(st.maxWaitAddress))
record = append(record, types.NewIntDatum(st.memMax))
if st.succ {
record = append(record, types.NewIntDatum(1))
} else {
record = append(record, types.NewIntDatum(0))
}
record = append(record, types.NewStringDatum(parsePlan(st.plan)))
record = append(record, types.NewStringDatum(st.planDigest))
record = append(record, types.NewStringDatum(st.prevStmt))
record = append(record, types.NewStringDatum(st.sql))
return record
}
func parsePlan(planString string) string {
if len(planString) <= len(variable.SlowLogPlanPrefix)+len(variable.SlowLogPlanSuffix) {
return planString
}
planString = planString[len(variable.SlowLogPlanPrefix) : len(planString)-len(variable.SlowLogPlanSuffix)]
decodePlanString, err := plancodec.DecodePlan(planString)
if err == nil {
planString = decodePlanString
} else {
logutil.BgLogger().Error("decode plan in slow log failed", zap.String("plan", planString), zap.Error(err))
}
return planString
}
// ParseTime exports for testing.
func ParseTime(s string) (time.Time, error) {
t, err := time.Parse(logutil.SlowLogTimeFormat, s)
if err != nil {
// This is for compatibility.
t, err = time.Parse(logutil.OldSlowLogTimeFormat, s)
if err != nil {
err = errors.Errorf("string \"%v\" doesn't has a prefix that matches format \"%v\", err: %v", s, logutil.SlowLogTimeFormat, err)
}
}
return t, err
}