Files
tidb/executor/slow_query.go

782 lines
25 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"bufio"
"context"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/plancodec"
"go.uber.org/zap"
)
//slowQueryRetriever is used to read slow log data.
type slowQueryRetriever struct {
table *model.TableInfo
outputCols []*model.ColumnInfo
initialized bool
extractor *plannercore.SlowQueryExtractor
files []logFile
fileIdx int
fileLine int
checker *slowLogChecker
parsedSlowLogCh chan parsedSlowLog
}
func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if !e.initialized {
err := e.initialize(sctx)
if err != nil {
return nil, err
}
e.initializeAsyncParsing(ctx, sctx)
}
rows, retrieved, err := e.dataForSlowLog(ctx)
if err != nil {
return nil, err
}
if retrieved {
return nil, nil
}
if len(e.outputCols) == len(e.table.Columns) {
return rows, nil
}
retRows := make([][]types.Datum, len(rows))
for i, fullRow := range rows {
row := make([]types.Datum, len(e.outputCols))
for j, col := range e.outputCols {
row[j] = fullRow[col.Offset]
}
retRows[i] = row
}
return retRows, nil
}
func (e *slowQueryRetriever) initialize(sctx sessionctx.Context) error {
var err error
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(sctx); pm != nil {
hasProcessPriv = pm.RequestVerification(sctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv)
}
e.checker = &slowLogChecker{
hasProcessPriv: hasProcessPriv,
user: sctx.GetSessionVars().User,
}
if e.extractor != nil {
e.checker.enableTimeCheck = e.extractor.Enable
e.checker.startTime = types.NewTime(types.FromGoTime(e.extractor.StartTime), mysql.TypeDatetime, types.MaxFsp)
e.checker.endTime = types.NewTime(types.FromGoTime(e.extractor.EndTime), mysql.TypeDatetime, types.MaxFsp)
}
e.initialized = true
e.files, err = e.getAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile)
return err
}
func (e *slowQueryRetriever) close() error {
for _, f := range e.files {
err := f.file.Close()
if err != nil {
logutil.BgLogger().Error("close slow log file failed.", zap.Error(err))
}
}
return nil
}
type parsedSlowLog struct {
rows [][]types.Datum
err error
}
func (e *slowQueryRetriever) parseDataForSlowLog(ctx context.Context, sctx sessionctx.Context) {
if len(e.files) == 0 {
close(e.parsedSlowLogCh)
return
}
reader := bufio.NewReader(e.files[0].file)
for e.fileIdx < len(e.files) {
rows, err := e.parseSlowLog(sctx, reader, 1024)
select {
case <-ctx.Done():
break
case e.parsedSlowLogCh <- parsedSlowLog{rows, err}:
}
}
close(e.parsedSlowLogCh)
}
func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context) ([][]types.Datum, bool, error) {
var (
slowLog parsedSlowLog
ok bool
)
select {
case slowLog, ok = <-e.parsedSlowLogCh:
case <-ctx.Done():
return nil, false, ctx.Err()
}
if !ok {
// When e.parsedSlowLogCh is closed, the slow log data is retrieved.
return nil, true, nil
}
rows, err := slowLog.rows, slowLog.err
if err != nil {
return nil, false, err
}
if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) {
rows, err := infoschema.AppendHostInfoToRows(rows)
return rows, false, err
}
return rows, false, nil
}
type slowLogChecker struct {
// Below fields is used to check privilege.
hasProcessPriv bool
user *auth.UserIdentity
// Below fields is used to check slow log time valid.
enableTimeCheck bool
startTime types.Time
endTime types.Time
}
func (sc *slowLogChecker) hasPrivilege(userName string) bool {
return sc.hasProcessPriv || sc.user == nil || userName == sc.user.Username
}
func (sc *slowLogChecker) isTimeValid(t types.Time) bool {
if sc.enableTimeCheck && (t.Compare(sc.startTime) < 0 || t.Compare(sc.endTime) > 0) {
return false
}
return true
}
// TODO: optimize for parse huge log-file.
func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) {
var rows [][]types.Datum
var st *slowQueryTuple
startFlag := false
tz := ctx.GetSessionVars().Location()
for {
if len(rows) >= maxRow {
return rows, nil
}
e.fileLine++
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
e.fileIdx++
e.fileLine = 0
if e.fileIdx >= len(e.files) {
return rows, nil
}
reader.Reset(e.files[e.fileIdx].file)
continue
}
return rows, err
}
line := string(hack.String(lineByte))
// Check slow log entry start flag.
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], e.fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if valid {
startFlag = true
}
continue
}
if startFlag {
// Parse slow log field.
if strings.HasPrefix(line, variable.SlowLogRowPrefixStr) {
line = line[len(variable.SlowLogRowPrefixStr):]
if strings.HasPrefix(line, variable.SlowLogPrevStmtPrefix) {
st.prevStmt = line[len(variable.SlowLogPrevStmtPrefix):]
} else if strings.HasPrefix(line, variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr) {
// the user and hostname field has a special format, for example, # User@Host: root[root] @ localhost [127.0.0.1]
value := line[len(variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr):]
valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, e.fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if !valid {
startFlag = false
}
} else {
fieldValues := strings.Split(line, " ")
for i := 0; i < len(fieldValues)-1; i += 2 {
field := fieldValues[i]
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
valid, err := st.setFieldValue(tz, field, fieldValues[i+1], e.fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if !valid {
startFlag = false
}
}
}
} else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
if strings.HasPrefix(line, "use") {
// `use DB` statements in the slow log is used to keep it be compatible with MySQL,
// since we already get the current DB from the `# DB` field, we can ignore it here,
// please see https://github.com/pingcap/tidb/issues/17846 for more details.
continue
}
// Get the sql string, and mark the start flag to false.
_, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if e.checker.hasPrivilege(st.user) {
rows = append(rows, st.convertToDatumRow())
}
startFlag = false
} else {
startFlag = false
}
}
}
}
func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return resByte, err
}
var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
resByte = append(resByte, tempLine...)
// Use the max value of max_allowed_packet to check the single line length.
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return resByte, err
}
}
return resByte, err
}
type slowQueryTuple struct {
time types.Time
txnStartTs uint64
user string
host string
connID uint64
queryTime float64
parseTime float64
compileTime float64
rewriteTime float64
preprocSubqueries uint64
preprocSubQueryTime float64
optimizeTime float64
waitTSTime float64
preWriteTime float64
waitPrewriteBinlogTime float64
commitTime float64
getCommitTSTime float64
commitBackoffTime float64
backoffTypes string
resolveLockTime float64
localLatchWaitTime float64
writeKeys uint64
writeSize uint64
prewriteRegion uint64
txnRetry uint64
copTime float64
processTime float64
waitTime float64
backOffTime float64
lockKeysTime float64
requestCount uint64
totalKeys uint64
processKeys uint64
db string
indexIDs string
digest string
statsInfo string
avgProcessTime float64
p90ProcessTime float64
maxProcessTime float64
maxProcessAddress string
avgWaitTime float64
p90WaitTime float64
maxWaitTime float64
maxWaitAddress string
memMax int64
diskMax int64
prevStmt string
sql string
isInternal bool
succ bool
planFromCache bool
plan string
planDigest string
}
func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, lineNum int, checker *slowLogChecker) (valid bool, err error) {
valid = true
switch field {
case variable.SlowLogTimeStr:
var t time.Time
t, err = ParseTime(value)
if err != nil {
break
}
st.time = types.NewTime(types.FromGoTime(t), mysql.TypeDatetime, types.MaxFsp)
if checker != nil {
valid = checker.isTimeValid(st.time)
}
if t.Location() != tz {
t = t.In(tz)
st.time = types.NewTime(types.FromGoTime(t), mysql.TypeDatetime, types.MaxFsp)
}
case variable.SlowLogTxnStartTSStr:
st.txnStartTs, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogUserStr:
// the old User format is kept for compatibility
fields := strings.SplitN(value, "@", 2)
if len(field) > 0 {
st.user = fields[0]
}
if len(field) > 1 {
st.host = fields[1]
}
if checker != nil {
valid = checker.hasPrivilege(st.user)
}
case variable.SlowLogUserAndHostStr:
// the new User&Host format: root[root] @ localhost [127.0.0.1]
fields := strings.SplitN(value, "@", 2)
if len(fields) > 0 {
tmp := strings.Split(fields[0], "[")
st.user = strings.TrimSpace(tmp[0])
}
if len(fields) > 1 {
tmp := strings.Split(fields[1], "[")
st.host = strings.TrimSpace(tmp[0])
}
if checker != nil {
valid = checker.hasPrivilege(st.user)
}
case variable.SlowLogConnIDStr:
st.connID, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogQueryTimeStr:
st.queryTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogParseTimeStr:
st.parseTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCompileTimeStr:
st.compileTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogOptimizeTimeStr:
st.optimizeTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogWaitTSTimeStr:
st.waitTSTime, err = strconv.ParseFloat(value, 64)
case execdetails.PreWriteTimeStr:
st.preWriteTime, err = strconv.ParseFloat(value, 64)
case execdetails.WaitPrewriteBinlogTimeStr:
st.waitPrewriteBinlogTime, err = strconv.ParseFloat(value, 64)
case execdetails.CommitTimeStr:
st.commitTime, err = strconv.ParseFloat(value, 64)
case execdetails.GetCommitTSTimeStr:
st.getCommitTSTime, err = strconv.ParseFloat(value, 64)
case execdetails.CommitBackoffTimeStr:
st.commitBackoffTime, err = strconv.ParseFloat(value, 64)
case execdetails.BackoffTypesStr:
st.backoffTypes = value
case execdetails.ResolveLockTimeStr:
st.resolveLockTime, err = strconv.ParseFloat(value, 64)
case execdetails.LocalLatchWaitTimeStr:
st.localLatchWaitTime, err = strconv.ParseFloat(value, 64)
case execdetails.WriteKeysStr:
st.writeKeys, err = strconv.ParseUint(value, 10, 64)
case execdetails.WriteSizeStr:
st.writeSize, err = strconv.ParseUint(value, 10, 64)
case execdetails.PrewriteRegionStr:
st.prewriteRegion, err = strconv.ParseUint(value, 10, 64)
case execdetails.TxnRetryStr:
st.txnRetry, err = strconv.ParseUint(value, 10, 64)
case execdetails.CopTimeStr:
st.copTime, err = strconv.ParseFloat(value, 64)
case execdetails.ProcessTimeStr:
st.processTime, err = strconv.ParseFloat(value, 64)
case execdetails.WaitTimeStr:
st.waitTime, err = strconv.ParseFloat(value, 64)
case execdetails.BackoffTimeStr:
st.backOffTime, err = strconv.ParseFloat(value, 64)
case execdetails.LockKeysTimeStr:
st.lockKeysTime, err = strconv.ParseFloat(value, 64)
case execdetails.RequestCountStr:
st.requestCount, err = strconv.ParseUint(value, 10, 64)
case execdetails.TotalKeysStr:
st.totalKeys, err = strconv.ParseUint(value, 10, 64)
case execdetails.ProcessKeysStr:
st.processKeys, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogDBStr:
st.db = value
case variable.SlowLogIndexNamesStr:
st.indexIDs = value
case variable.SlowLogIsInternalStr:
st.isInternal = value == "true"
case variable.SlowLogDigestStr:
st.digest = value
case variable.SlowLogStatsInfoStr:
st.statsInfo = value
case variable.SlowLogCopProcAvg:
st.avgProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcP90:
st.p90ProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcMax:
st.maxProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcAddr:
st.maxProcessAddress = value
case variable.SlowLogCopWaitAvg:
st.avgWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitP90:
st.p90WaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitMax:
st.maxWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitAddr:
st.maxWaitAddress = value
case variable.SlowLogMemMax:
st.memMax, err = strconv.ParseInt(value, 10, 64)
case variable.SlowLogSucc:
st.succ, err = strconv.ParseBool(value)
case variable.SlowLogPlanFromCache:
st.planFromCache, err = strconv.ParseBool(value)
case variable.SlowLogPlan:
st.plan = value
case variable.SlowLogPlanDigest:
st.planDigest = value
case variable.SlowLogQuerySQLStr:
st.sql = value
case variable.SlowLogDiskMax:
st.diskMax, err = strconv.ParseInt(value, 10, 64)
case variable.SlowLogRewriteTimeStr:
st.rewriteTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogPreprocSubQueriesStr:
st.preprocSubqueries, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogPreProcSubQueryTimeStr:
st.preprocSubQueryTime, err = strconv.ParseFloat(value, 64)
}
if err != nil {
return valid, errors.Wrap(err, "Parse slow log at line "+strconv.FormatInt(int64(lineNum), 10)+" failed. Field: `"+field+"`, error")
}
return valid, err
}
func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record := make([]types.Datum, 0, 64)
record = append(record, types.NewTimeDatum(st.time))
record = append(record, types.NewUintDatum(st.txnStartTs))
record = append(record, types.NewStringDatum(st.user))
record = append(record, types.NewStringDatum(st.host))
record = append(record, types.NewUintDatum(st.connID))
record = append(record, types.NewFloat64Datum(st.queryTime))
record = append(record, types.NewFloat64Datum(st.parseTime))
record = append(record, types.NewFloat64Datum(st.compileTime))
record = append(record, types.NewFloat64Datum(st.rewriteTime))
record = append(record, types.NewUintDatum(st.preprocSubqueries))
record = append(record, types.NewFloat64Datum(st.preprocSubQueryTime))
record = append(record, types.NewFloat64Datum(st.optimizeTime))
record = append(record, types.NewFloat64Datum(st.waitTSTime))
record = append(record, types.NewFloat64Datum(st.preWriteTime))
record = append(record, types.NewFloat64Datum(st.waitPrewriteBinlogTime))
record = append(record, types.NewFloat64Datum(st.commitTime))
record = append(record, types.NewFloat64Datum(st.getCommitTSTime))
record = append(record, types.NewFloat64Datum(st.commitBackoffTime))
record = append(record, types.NewStringDatum(st.backoffTypes))
record = append(record, types.NewFloat64Datum(st.resolveLockTime))
record = append(record, types.NewFloat64Datum(st.localLatchWaitTime))
record = append(record, types.NewUintDatum(st.writeKeys))
record = append(record, types.NewUintDatum(st.writeSize))
record = append(record, types.NewUintDatum(st.prewriteRegion))
record = append(record, types.NewUintDatum(st.txnRetry))
record = append(record, types.NewFloat64Datum(st.copTime))
record = append(record, types.NewFloat64Datum(st.processTime))
record = append(record, types.NewFloat64Datum(st.waitTime))
record = append(record, types.NewFloat64Datum(st.backOffTime))
record = append(record, types.NewFloat64Datum(st.lockKeysTime))
record = append(record, types.NewUintDatum(st.requestCount))
record = append(record, types.NewUintDatum(st.totalKeys))
record = append(record, types.NewUintDatum(st.processKeys))
record = append(record, types.NewStringDatum(st.db))
record = append(record, types.NewStringDatum(st.indexIDs))
record = append(record, types.NewDatum(st.isInternal))
record = append(record, types.NewStringDatum(st.digest))
record = append(record, types.NewStringDatum(st.statsInfo))
record = append(record, types.NewFloat64Datum(st.avgProcessTime))
record = append(record, types.NewFloat64Datum(st.p90ProcessTime))
record = append(record, types.NewFloat64Datum(st.maxProcessTime))
record = append(record, types.NewStringDatum(st.maxProcessAddress))
record = append(record, types.NewFloat64Datum(st.avgWaitTime))
record = append(record, types.NewFloat64Datum(st.p90WaitTime))
record = append(record, types.NewFloat64Datum(st.maxWaitTime))
record = append(record, types.NewStringDatum(st.maxWaitAddress))
record = append(record, types.NewIntDatum(st.memMax))
record = append(record, types.NewIntDatum(st.diskMax))
if st.succ {
record = append(record, types.NewIntDatum(1))
} else {
record = append(record, types.NewIntDatum(0))
}
if st.planFromCache {
record = append(record, types.NewIntDatum(1))
} else {
record = append(record, types.NewIntDatum(0))
}
record = append(record, types.NewStringDatum(parsePlan(st.plan)))
record = append(record, types.NewStringDatum(st.planDigest))
record = append(record, types.NewStringDatum(st.prevStmt))
record = append(record, types.NewStringDatum(st.sql))
return record
}
func parsePlan(planString string) string {
if len(planString) <= len(variable.SlowLogPlanPrefix)+len(variable.SlowLogPlanSuffix) {
return planString
}
planString = planString[len(variable.SlowLogPlanPrefix) : len(planString)-len(variable.SlowLogPlanSuffix)]
decodePlanString, err := plancodec.DecodePlan(planString)
if err == nil {
planString = decodePlanString
} else {
logutil.BgLogger().Error("decode plan in slow log failed", zap.String("plan", planString), zap.Error(err))
}
return planString
}
// ParseTime exports for testing.
func ParseTime(s string) (time.Time, error) {
t, err := time.Parse(logutil.SlowLogTimeFormat, s)
if err != nil {
// This is for compatibility.
t, err = time.Parse(logutil.OldSlowLogTimeFormat, s)
if err != nil {
err = errors.Errorf("string \"%v\" doesn't has a prefix that matches format \"%v\", err: %v", s, logutil.SlowLogTimeFormat, err)
}
}
return t, err
}
type logFile struct {
file *os.File // The opened file handle
start, end time.Time // The start/end time of the log file
}
// getAllFiles is used to get all slow-log needed to parse, it is exported for test.
func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) {
if e.extractor == nil || !e.extractor.Enable {
file, err := os.Open(logFilePath)
if err != nil {
return nil, err
}
return []logFile{{file: file}}, nil
}
var logFiles []logFile
logDir := filepath.Dir(logFilePath)
ext := filepath.Ext(logFilePath)
prefix := logFilePath[:len(logFilePath)-len(ext)]
handleErr := func(err error) error {
// Ignore the error and append warning for usability.
if err != io.EOF {
sctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
return nil
}
err := filepath.Walk(logDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return handleErr(err)
}
if info.IsDir() {
return nil
}
// All rotated log files have the same prefix with the original file.
if !strings.HasPrefix(path, prefix) {
return nil
}
file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm)
if err != nil {
return handleErr(err)
}
skip := false
defer func() {
if !skip {
terror.Log(file.Close())
}
}()
// Get the file start time.
fileStartTime, err := e.getFileStartTime(file)
if err != nil {
return handleErr(err)
}
start := types.NewTime(types.FromGoTime(fileStartTime), mysql.TypeDatetime, types.MaxFsp)
if start.Compare(e.checker.endTime) > 0 {
return nil
}
// Get the file end time.
fileEndTime, err := e.getFileEndTime(file)
if err != nil {
return handleErr(err)
}
end := types.NewTime(types.FromGoTime(fileEndTime), mysql.TypeDatetime, types.MaxFsp)
if end.Compare(e.checker.startTime) < 0 {
return nil
}
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return handleErr(err)
}
logFiles = append(logFiles, logFile{
file: file,
start: fileStartTime,
end: fileEndTime,
})
skip = true
return nil
})
// Sort by start time
sort.Slice(logFiles, func(i, j int) bool {
return logFiles[i].start.Before(logFiles[j].start)
})
return logFiles, err
}
func (e *slowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) {
var t time.Time
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return t, err
}
reader := bufio.NewReader(file)
maxNum := 128
for {
lineByte, err := getOneLine(reader)
if err != nil {
return t, err
}
line := string(lineByte)
if strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
return ParseTime(line[len(variable.SlowLogStartPrefixStr):])
}
maxNum -= 1
if maxNum <= 0 {
break
}
}
return t, errors.Errorf("malform slow query file %v", file.Name())
}
func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) {
var t time.Time
stat, err := file.Stat()
if err != nil {
return t, err
}
fileSize := stat.Size()
cursor := int64(0)
line := make([]byte, 0, 64)
maxLineNum := 128
tryGetTime := func(line []byte) string {
for i, j := 0, len(line)-1; i < j; i, j = i+1, j-1 {
line[i], line[j] = line[j], line[i]
}
lineStr := string(line)
lineStr = strings.TrimSpace(lineStr)
if strings.HasPrefix(lineStr, variable.SlowLogStartPrefixStr) {
return lineStr[len(variable.SlowLogStartPrefixStr):]
}
return ""
}
for {
cursor -= 1
_, err := file.Seek(cursor, io.SeekEnd)
if err != nil {
return t, err
}
char := make([]byte, 1)
_, err = file.Read(char)
if err != nil {
return t, err
}
// If find a line.
if cursor != -1 && (char[0] == '\n' || char[0] == '\r') {
if timeStr := tryGetTime(line); len(timeStr) > 0 {
return ParseTime(timeStr)
}
line = line[:0]
maxLineNum -= 1
}
line = append(line, char[0])
if cursor == -fileSize || maxLineNum <= 0 {
if timeStr := tryGetTime(line); len(timeStr) > 0 {
return ParseTime(timeStr)
}
return t, errors.Errorf("malform slow query file %v", file.Name())
}
}
}
func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) {
e.parsedSlowLogCh = make(chan parsedSlowLog, 1)
go e.parseDataForSlowLog(ctx, sctx)
}