Files
tidb/pkg/executor/adapter.go

2343 lines
80 KiB
Go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"fmt"
"math"
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics"
"github.com/pingcap/tidb/pkg/executor/staticrecordset"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/core/resolve"
"github.com/pingcap/tidb/pkg/plugin"
"github.com/pingcap/tidb/pkg/resourcegroup/runaway"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/types"
util2 "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/breakpoint"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/hint"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/pingcap/tidb/pkg/util/replayer"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/stmtsummary"
stmtsummaryv2 "github.com/pingcap/tidb/pkg/util/stmtsummary/v2"
"github.com/pingcap/tidb/pkg/util/stringutil"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/traceevent"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/prometheus/client_golang/prometheus"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
tikvtrace "github.com/tikv/client-go/v2/trace"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// processinfoSetter is the interface use to set current running process info.
type processinfoSetter interface {
SetProcessInfo(string, time.Time, byte, uint64)
UpdateProcessInfo()
}
// recordSet wraps an executor, implements sqlexec.RecordSet interface
type recordSet struct {
fields []*resolve.ResultField
executor exec.Executor
// The `Fields` method may be called after `Close`, and the executor is cleared in the `Close` function.
// Therefore, we need to store the schema in `recordSet` to avoid a null pointer exception when calling `executor.Schema()`.
schema *expression.Schema
stmt *ExecStmt
lastErrs []error
txnStartTS uint64
once sync.Once
// traceID stores the trace ID for this statement execution.
// It's injected into the context during Next() to ensure trace correlation
// across TiDB -> client-go -> TiKV during lazy execution.
traceID []byte
}
func (a *recordSet) Fields() []*resolve.ResultField {
if len(a.fields) == 0 {
a.fields = colNames2ResultFields(a.schema, a.stmt.OutputNames, a.stmt.Ctx.GetSessionVars().CurrentDB)
}
return a.fields
}
func colNames2ResultFields(schema *expression.Schema, names []*types.FieldName, defaultDB string) []*resolve.ResultField {
rfs := make([]*resolve.ResultField, 0, schema.Len())
defaultDBCIStr := ast.NewCIStr(defaultDB)
for i := range schema.Len() {
dbName := names[i].DBName
if dbName.L == "" && names[i].TblName.L != "" {
dbName = defaultDBCIStr
}
origColName := names[i].OrigColName
emptyOrgName := false
if origColName.L == "" {
origColName = names[i].ColName
emptyOrgName = true
}
rf := &resolve.ResultField{
Column: &model.ColumnInfo{Name: origColName, FieldType: *schema.Columns[i].RetType},
ColumnAsName: names[i].ColName,
EmptyOrgName: emptyOrgName,
Table: &model.TableInfo{Name: names[i].OrigTblName},
TableAsName: names[i].TblName,
DBName: dbName,
}
// This is for compatibility.
// See issue https://github.com/pingcap/tidb/issues/10513 .
if len(rf.ColumnAsName.O) > mysql.MaxAliasIdentifierLen {
rf.ColumnAsName.O = rf.ColumnAsName.O[:mysql.MaxAliasIdentifierLen]
}
// Usually the length of O equals the length of L.
// Add this len judgement to avoid panic.
if len(rf.ColumnAsName.L) > mysql.MaxAliasIdentifierLen {
rf.ColumnAsName.L = rf.ColumnAsName.L[:mysql.MaxAliasIdentifierLen]
}
rfs = append(rfs, rf)
}
return rfs
}
// Next use uses recordSet's executor to get next available chunk for later usage.
// If chunk does not contain any rows, then we update last query found rows in session variable as current found rows.
// The reason we need update is that chunk with 0 rows indicating we already finished current query, we need prepare for
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) {
defer func() {
r := recover()
if r == nil {
return
}
err = util2.GetRecoverError(r)
logutil.Logger(ctx).Warn("execute sql panic", zap.String("sql", a.stmt.GetTextToLog(false)), zap.Stack("stack"))
}()
if a.stmt != nil {
if err := a.stmt.Ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return err
}
}
// Inject trace ID into context for correlation during lazy execution.
// The trace ID was generated when the statement started executing, but the
// context passed to Next() doesn't have it (context is not stored in recordSet).
// We need to re-inject it here so that operations in executors (e.g., TiKV calls)
// can be correlated with this statement's trace ID.
if len(a.traceID) > 0 {
ctx = tikvtrace.ContextWithTraceID(ctx, a.traceID)
}
err = a.stmt.next(ctx, a.executor, req)
if err != nil {
a.lastErrs = append(a.lastErrs, err)
return err
}
numRows := req.NumRows()
if numRows == 0 {
if a.stmt != nil {
a.stmt.Ctx.GetSessionVars().LastFoundRows = a.stmt.Ctx.GetSessionVars().StmtCtx.FoundRows()
}
return nil
}
if a.stmt != nil {
a.stmt.Ctx.GetSessionVars().StmtCtx.AddFoundRows(uint64(numRows))
}
return nil
}
// NewChunk create a chunk base on top-level executor's exec.NewFirstChunk().
func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
if alloc == nil {
return exec.NewFirstChunk(a.executor)
}
return alloc.Alloc(a.executor.RetFieldTypes(), a.executor.InitCap(), a.executor.MaxChunkSize())
}
func (a *recordSet) Finish() error {
var err error
a.once.Do(func() {
err = exec.Close(a.executor)
cteErr := resetCTEStorageMap(a.stmt.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if err == nil {
err = cteErr
}
a.executor = nil
if a.stmt != nil {
status := a.stmt.Ctx.GetSessionVars().SQLKiller.GetKillSignal()
inWriteResultSet := a.stmt.Ctx.GetSessionVars().SQLKiller.InWriteResultSet.Load()
if status > 0 && inWriteResultSet {
logutil.BgLogger().Warn("kill, this SQL might be stuck in the network stack while writing packets to the client.", zap.Uint64("connection ID", a.stmt.Ctx.GetSessionVars().ConnectionID))
}
}
})
if err != nil {
a.lastErrs = append(a.lastErrs, err)
}
return err
}
func (a *recordSet) Close() error {
err := a.Finish()
if err != nil {
logutil.BgLogger().Error("close recordSet error", zap.Error(err))
}
a.stmt.CloseRecordSet(a.txnStartTS, errors.Join(a.lastErrs...))
return err
}
// OnFetchReturned implements commandLifeCycle#OnFetchReturned
func (a *recordSet) OnFetchReturned() {
a.stmt.LogSlowQuery(a.txnStartTS, len(a.lastErrs) == 0, true)
}
// TryDetach creates a new `RecordSet` which doesn't depend on the current session context.
func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) {
e, ok := Detach(a.executor)
if !ok {
return nil, false, nil
}
return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil
}
// GetExecutor4Test exports the internal executor for test purpose.
func (a *recordSet) GetExecutor4Test() any {
return a.executor
}
// TelemetryInfo records some telemetry information during execution.
type TelemetryInfo struct {
UseNonRecursive bool
UseRecursive bool
UseMultiSchemaChange bool
UseExchangePartition bool
UseFlashbackToCluster bool
PartitionTelemetry *PartitionTelemetryInfo
AccountLockTelemetry *AccountLockTelemetryInfo
UseIndexMerge bool
UseTableLookUp atomic.Bool
}
// PartitionTelemetryInfo records table partition telemetry information during execution.
type PartitionTelemetryInfo struct {
UseTablePartition bool
UseTablePartitionList bool
UseTablePartitionRange bool
UseTablePartitionHash bool
UseTablePartitionRangeColumns bool
UseTablePartitionRangeColumnsGt1 bool
UseTablePartitionRangeColumnsGt2 bool
UseTablePartitionRangeColumnsGt3 bool
UseTablePartitionListColumns bool
TablePartitionMaxPartitionsNum uint64
UseCreateIntervalPartition bool
UseAddIntervalPartition bool
UseDropIntervalPartition bool
UseCompactTablePartition bool
UseReorganizePartition bool
}
// AccountLockTelemetryInfo records account lock/unlock information during execution
type AccountLockTelemetryInfo struct {
// The number of CREATE/ALTER USER statements that lock the user
LockUser int64
// The number of CREATE/ALTER USER statements that unlock the user
UnlockUser int64
// The number of CREATE/ALTER USER statements
CreateOrAlterUser int64
}
// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
GoCtx context.Context
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Plan base.Plan
StmtNode ast.StmtNode
Ctx sessionctx.Context
// LowerPriority represents whether to lower the execution priority of a query.
LowerPriority bool
isPreparedStmt bool
isSelectForUpdate bool
retryCount uint
retryStartTime time.Time
// Phase durations are splited into two parts: 1. trying to lock keys (but
// failed); 2. the final iteration of the retry loop. Here we use
// [2]time.Duration to record such info for each phase. The first duration
// is increased only within the current iteration. When we meet a
// pessimistic lock error and decide to retry, we add the first duration to
// the second and reset the first to 0 by calling `resetPhaseDurations`.
phaseBuildDurations [2]time.Duration
phaseOpenDurations [2]time.Duration
phaseNextDurations [2]time.Duration
phaseLockDurations [2]time.Duration
// OutputNames will be set if using cached plan
OutputNames []*types.FieldName
PsStmt *plannercore.PlanCacheStmt
Ti *TelemetryInfo
}
// GetStmtNode returns the stmtNode inside Statement
func (a *ExecStmt) GetStmtNode() ast.StmtNode {
return a.StmtNode
}
// PointGet short path for point exec directly from plan, keep only necessary steps
func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
r, ctx := tracing.StartRegionEx(ctx, "ExecStmt.PointGet")
defer r.End()
if r.Span != nil {
r.Span.LogKV("sql", a.Text())
}
failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
staleread.AssertStmtStaleness(a.Ctx, false)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, a.InfoSchema)
})
ctx = a.observeStmtBeginForTopSQL(ctx)
startTs, err := sessiontxn.GetTxnManager(a.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh
var executor exec.Executor
useMaxTS := startTs == math.MaxUint64
// try to reuse point get executor
// We should only use the cached the executor when the startTS is MaxUint64
if a.PsStmt.PointGet.Executor != nil && useMaxTS {
exec, ok := a.PsStmt.PointGet.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
a.PsStmt.PointGet.Executor = nil
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.Plan.(*physicalop.PointGetPlan)
exec.Recreated(pointGetPlan, a.Ctx)
a.PsStmt.PointGet.Executor = exec
executor = exec
// If reuses the executor, the executor build phase is skipped, and the txn will not be activated that
// caused `TxnCtx.StartTS` to be 0.
// So we should set the `TxnCtx.StartTS` manually here to make sure it is not 0
// to provide the right value for `@@tidb_last_txn_info` or other variables.
a.Ctx.GetSessionVars().TxnCtx.StartTS = startTs
}
}
if executor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
executor = b.build(a.Plan)
if b.err != nil {
return nil, b.err
}
pointExecutor, ok := executor.(*PointGetExecutor)
// Don't cache the executor for non point-get (table dual) or partitioned tables
if ok && useMaxTS && pointExecutor.partitionDefIdx == nil {
a.PsStmt.PointGet.Executor = pointExecutor
}
}
if err = exec.Open(ctx, executor); err != nil {
terror.Log(exec.Close(executor))
return nil, err
}
sctx := a.Ctx
cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue)
cmd := byte(cmd32)
var pi processinfoSetter
if raw, ok := sctx.(processinfoSetter); ok {
pi = raw
sql := a.Text()
maxExecutionTime := sctx.GetSessionVars().GetMaxExecutionTime()
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
if sctx.GetSessionVars().StmtCtx.StmtType == "" {
sctx.GetSessionVars().StmtCtx.StmtType = stmtctx.GetStmtLabel(ctx, a.StmtNode)
}
}
// Extract trace ID from context to store in recordSet for lazy execution
traceID := tikvtrace.TraceIDFromContext(ctx)
return &recordSet{
executor: executor,
schema: executor.Schema(),
stmt: a,
txnStartTS: startTs,
traceID: traceID,
}, nil
}
// OriginText returns original statement as a string.
func (a *ExecStmt) OriginText() string {
return a.StmtNode.OriginalText()
}
// Text returns utf8 encoded statement as a string.
func (a *ExecStmt) Text() string {
return a.StmtNode.Text()
}
// IsPrepared returns true if stmt is a prepare statement.
func (a *ExecStmt) IsPrepared() bool {
return a.isPreparedStmt
}
// IsReadOnly returns true if a statement is read only.
// If current StmtNode is an ExecuteStmt, we can get its prepared stmt,
// then using ast.IsReadOnly function to determine a statement is read only or not.
func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool {
return plannercore.IsReadOnly(a.StmtNode, vars)
}
// RebuildPlan rebuilds current execute statement plan.
// It returns the current information schema version that 'a' is using.
func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
ret := &plannercore.PreprocessorReturn{}
nodeW := resolve.NewNodeW(a.StmtNode)
if err := plannercore.Preprocess(ctx, a.Ctx, nodeW, plannercore.InTxnRetry, plannercore.InitTxnContextProvider, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}
failpoint.Inject("assertTxnManagerInRebuildPlan", func() {
if is, ok := a.Ctx.Value(sessiontxn.AssertTxnInfoSchemaAfterRetryKey).(infoschema.InfoSchema); ok {
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is)
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, nil)
}
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema)
staleread.AssertStmtStaleness(a.Ctx, ret.IsStaleness)
if ret.IsStaleness {
sessiontxn.AssertTxnManagerReadTS(a.Ctx, ret.LastSnapshotTS)
}
})
a.InfoSchema = sessiontxn.GetTxnManager(a.Ctx).GetTxnInfoSchema()
replicaReadScope := sessiontxn.GetTxnManager(a.Ctx).GetReadReplicaScope()
if a.Ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && replicaReadScope == kv.GlobalReplicaScope {
logutil.BgLogger().Warn(fmt.Sprintf("tidb can't read closest replicas due to it haven't %s label", placement.DCLabelKey))
}
p, names, err := planner.Optimize(ctx, a.Ctx, nodeW, a.InfoSchema)
if err != nil {
return 0, err
}
a.OutputNames = names
a.Plan = p
a.Ctx.GetSessionVars().StmtCtx.SetPlan(p)
return a.InfoSchema.SchemaMetaVersion(), nil
}
// IsFastPlan exports for testing.
func IsFastPlan(p base.Plan) bool {
if proj, ok := p.(*physicalop.PhysicalProjection); ok {
p = proj.Children()[0]
}
switch p.(type) {
case *physicalop.PointGetPlan:
return true
case *physicalop.PhysicalTableDual:
// Plan of following SQL is PhysicalTableDual:
// select 1;
// select @@autocommit;
return true
case *plannercore.Set:
// Plan of following SQL is Set:
// set @a=1;
// set @@autocommit=1;
return true
}
return false
}
// Exec builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function. If the Executor returns
// result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
defer func() {
r := recover()
if r == nil {
if a.retryCount > 0 {
metrics.StatementPessimisticRetryCount.Observe(float64(a.retryCount))
}
execDetails := a.Ctx.GetSessionVars().StmtCtx.GetExecDetails()
if execDetails.LockKeysDetail != nil {
if execDetails.LockKeysDetail.LockKeys > 0 {
metrics.StatementLockKeysCount.Observe(float64(execDetails.LockKeysDetail.LockKeys))
}
if a.Ctx.GetSessionVars().StmtCtx.PessimisticLockStarted() && execDetails.LockKeysDetail.TotalTime > 0 {
metrics.PessimisticLockKeysDuration.Observe(execDetails.LockKeysDetail.TotalTime.Seconds())
}
}
if err == nil && execDetails.LockKeysDetail != nil &&
(execDetails.LockKeysDetail.AggressiveLockNewCount > 0 || execDetails.LockKeysDetail.AggressiveLockDerivedCount > 0) {
a.Ctx.GetSessionVars().TxnCtx.FairLockingUsed = true
// If this statement is finished when some of the keys are locked with conflict in the last retry, or
// some of the keys are derived from the previous retry, we consider the optimization of fair locking
// takes effect on this statement.
if execDetails.LockKeysDetail.LockedWithConflictCount > 0 || execDetails.LockKeysDetail.AggressiveLockDerivedCount > 0 {
a.Ctx.GetSessionVars().TxnCtx.FairLockingEffective = true
}
}
return
}
recoverdErr, ok := r.(error)
if !ok || !(exeerrors.ErrMemoryExceedForQuery.Equal(recoverdErr) ||
exeerrors.ErrMemoryExceedForInstance.Equal(recoverdErr) ||
exeerrors.ErrQueryInterrupted.Equal(recoverdErr) ||
exeerrors.ErrMaxExecTimeExceeded.Equal(recoverdErr)) {
panic(r)
}
err = recoverdErr
logutil.Logger(ctx).Warn("execute sql panic", zap.String("sql", a.GetTextToLog(false)), zap.Stack("stack"))
}()
failpoint.Inject("assertStaleTSO", func(val failpoint.Value) {
if n, ok := val.(int); ok && staleread.IsStmtStaleness(a.Ctx) {
txnManager := sessiontxn.GetTxnManager(a.Ctx)
ts, err := txnManager.GetStmtReadTS()
if err != nil {
panic(err)
}
startTS := oracle.ExtractPhysical(ts) / 1000
if n != int(startTS) {
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
}
})
sctx := a.Ctx
ctx = util.SetSessionID(ctx, sctx.GetSessionVars().ConnectionID)
if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
oriStats, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBBuildStatsConcurrency)
if !ok {
oriStats = strconv.Itoa(vardef.DefBuildStatsConcurrency)
}
oriScan := sctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
oriIso, ok := sctx.GetSessionVars().GetSystemVar(vardef.TxnIsolation)
if !ok {
oriIso = "REPEATABLE-READ"
}
autoConcurrency, err1 := sctx.GetSessionVars().GetSessionOrGlobalSystemVar(ctx, vardef.TiDBAutoBuildStatsConcurrency)
terror.Log(err1)
if err1 == nil {
terror.Log(sctx.GetSessionVars().SetSystemVar(vardef.TiDBBuildStatsConcurrency, autoConcurrency))
}
sVal, err2 := sctx.GetSessionVars().GetSessionOrGlobalSystemVar(ctx, vardef.TiDBSysProcScanConcurrency)
terror.Log(err2)
if err2 == nil {
concurrency, err3 := strconv.ParseInt(sVal, 10, 64)
terror.Log(err3)
if err3 == nil {
sctx.GetSessionVars().SetAnalyzeDistSQLScanConcurrency(int(concurrency))
}
}
terror.Log(sctx.GetSessionVars().SetSystemVar(vardef.TxnIsolation, ast.ReadCommitted))
defer func() {
terror.Log(sctx.GetSessionVars().SetSystemVar(vardef.TiDBBuildStatsConcurrency, oriStats))
sctx.GetSessionVars().SetAnalyzeDistSQLScanConcurrency(oriScan)
terror.Log(sctx.GetSessionVars().SetSystemVar(vardef.TxnIsolation, oriIso))
}()
}
if sctx.GetSessionVars().StmtCtx.HasMemQuotaHint {
sctx.GetSessionVars().MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery)
}
// must set plan according to the `Execute` plan before getting planDigest
a.inheritContextFromExecuteStmt()
var rm *runaway.Manager
dom := domain.GetDomain(sctx)
if dom != nil {
rm = dom.RunawayManager()
}
if vardef.EnableResourceControl.Load() && rm != nil {
sessionVars := sctx.GetSessionVars()
stmtCtx := sessionVars.StmtCtx
_, planDigest := GetPlanDigest(stmtCtx)
_, sqlDigest := stmtCtx.SQLDigest()
stmtCtx.RunawayChecker = rm.DeriveChecker(stmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String(), sessionVars.StartTime)
switchGroupName, err := stmtCtx.RunawayChecker.BeforeExecutor()
if err != nil {
return nil, err
}
if len(switchGroupName) > 0 {
stmtCtx.ResourceGroupName = switchGroupName
}
}
ctx = a.observeStmtBeginForTopSQL(ctx)
// Record start time before buildExecutor() to include TSO waiting time in maxExecutionTime timeout.
// buildExecutor() may block waiting for TSO, so we should start the timer earlier.
cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue)
cmd := byte(cmd32)
var execStartTime time.Time
var pi processinfoSetter
if raw, ok := sctx.(processinfoSetter); ok {
pi = raw
execStartTime = time.Now()
}
e, err := a.buildExecutor()
if err != nil {
return nil, err
}
if pi != nil {
sql := a.getSQLForProcessInfo()
maxExecutionTime := sctx.GetSessionVars().GetMaxExecutionTime()
// Update processinfo, ShowProcess() will use it.
if a.Ctx.GetSessionVars().StmtCtx.StmtType == "" {
a.Ctx.GetSessionVars().StmtCtx.StmtType = stmtctx.GetStmtLabel(ctx, a.StmtNode)
}
// Since maxExecutionTime is used only for SELECT statements, here we limit its scope.
if !a.Ctx.GetSessionVars().StmtCtx.InSelectStmt {
maxExecutionTime = 0
}
pi.SetProcessInfo(sql, execStartTime, cmd, maxExecutionTime)
}
breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun)
if err = a.openExecutor(ctx, e); err != nil {
terror.Log(exec.Close(e))
return nil, err
}
isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic
if a.isSelectForUpdate {
if sctx.GetSessionVars().UseLowResolutionTSO() {
terror.Log(exec.Close(e))
return nil, errors.New("can not execute select for update statement when 'tidb_low_resolution_tso' is set")
}
// Special handle for "select for update statement" in pessimistic transaction.
if isPessimistic {
return a.handlePessimisticSelectForUpdate(ctx, e)
}
}
a.prepareFKCascadeContext(e)
if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled || err != nil {
return result, err
}
var txnStartTS uint64
txn, err := sctx.Txn(false)
if err != nil {
terror.Log(exec.Close(e))
return nil, err
}
if txn.Valid() {
txnStartTS = txn.StartTS()
}
// Extract trace ID from context to store in recordSet for lazy execution
traceID := tikvtrace.TraceIDFromContext(ctx)
return &recordSet{
executor: e,
schema: e.Schema(),
stmt: a,
txnStartTS: txnStartTS,
traceID: traceID,
}, nil
}
func (a *ExecStmt) inheritContextFromExecuteStmt() {
if executePlan, ok := a.Plan.(*plannercore.Execute); ok {
a.Ctx.SetValue(sessionctx.QueryString, executePlan.Stmt.Text())
a.OutputNames = executePlan.OutputNames()
a.isPreparedStmt = true
a.Plan = executePlan.Plan
a.Ctx.GetSessionVars().StmtCtx.SetPlan(executePlan.Plan)
}
}
func (a *ExecStmt) getSQLForProcessInfo() string {
sql := a.Text()
if simple, ok := a.Plan.(*plannercore.Simple); ok && simple.Statement != nil {
if ss, ok := simple.Statement.(ast.SensitiveStmtNode); ok {
// Use SecureText to avoid leak password information.
sql = ss.SecureText()
}
} else if sn, ok2 := a.StmtNode.(ast.SensitiveStmtNode); ok2 {
// such as import into statement
sql = sn.SecureText()
}
return sql
}
func (a *ExecStmt) handleStmtForeignKeyTrigger(ctx context.Context, e exec.Executor) error {
stmtCtx := a.Ctx.GetSessionVars().StmtCtx
if stmtCtx.ForeignKeyTriggerCtx.HasFKCascades {
// If the ExecStmt has foreign key cascade to be executed, we need call `StmtCommit` to commit the ExecStmt itself
// change first.
// Since `UnionScanExec` use `SnapshotIter` and `SnapshotGetter` to read txn mem-buffer, if we don't do `StmtCommit`,
// then the fk cascade executor can't read the mem-buffer changed by the ExecStmt.
a.Ctx.StmtCommit(ctx)
}
err := a.handleForeignKeyTrigger(ctx, e, 1)
if err != nil {
err1 := a.handleFKTriggerError(stmtCtx)
if err1 != nil {
return errors.Errorf("handle foreign key trigger error failed, err: %v, original_err: %v", err1, err)
}
return err
}
if stmtCtx.ForeignKeyTriggerCtx.SavepointName != "" {
a.Ctx.GetSessionVars().TxnCtx.ReleaseSavepoint(stmtCtx.ForeignKeyTriggerCtx.SavepointName)
}
return nil
}
var maxForeignKeyCascadeDepth = 15
func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e exec.Executor, depth int) error {
exec, ok := e.(WithForeignKeyTrigger)
if !ok {
return nil
}
fkChecks := exec.GetFKChecks()
for _, fkCheck := range fkChecks {
err := fkCheck.doCheck(ctx)
if err != nil {
return err
}
}
fkCascades := exec.GetFKCascades()
for _, fkCascade := range fkCascades {
err := a.handleForeignKeyCascade(ctx, fkCascade, depth)
if err != nil {
return err
}
}
return nil
}
// handleForeignKeyCascade uses to execute foreign key cascade behaviour, the progress is:
// 1. Build delete/update executor for foreign key on delete/update behaviour.
// a. Construct delete/update AST. We used to try generated SQL string first and then parse the SQL to get AST,
// but we need convert Datum to string, there may be some risks here, since assert_eq(datum_a, parse(datum_a.toString())) may be broken.
// so we chose to construct AST directly.
// b. Build plan by the delete/update AST.
// c. Build executor by the delete/update plan.
// 2. Execute the delete/update executor.
// 3. Close the executor.
// 4. `StmtCommit` to commit the kv change to transaction mem-buffer.
// 5. If the foreign key cascade behaviour has more fk value need to be cascaded, go to step 1.
func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeExec, depth int) error {
if a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
fkc.stats = &FKCascadeRuntimeStats{}
defer a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.plan.ID(), fkc.stats)
}
if len(fkc.fkValues) == 0 && len(fkc.fkUpdatedValuesMap) == 0 {
return nil
}
if depth > maxForeignKeyCascadeDepth {
return exeerrors.ErrForeignKeyCascadeDepthExceeded.GenWithStackByArgs(maxForeignKeyCascadeDepth)
}
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = true
defer func() {
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = false
}()
if fkc.stats != nil {
start := time.Now()
defer func() {
fkc.stats.Total += time.Since(start)
}()
}
for {
e, err := fkc.buildExecutor(ctx)
if err != nil || e == nil {
return err
}
if err := exec.Open(ctx, e); err != nil {
terror.Log(exec.Close(e))
return err
}
err = exec.Next(ctx, e, exec.NewFirstChunk(e))
failpoint.Inject("handleForeignKeyCascadeError", func(val failpoint.Value) {
// Next can recover panic and convert it to error. So we inject error directly here.
if val.(bool) && err == nil {
err = errors.New("handleForeignKeyCascadeError")
}
})
closeErr := exec.Close(e)
if err == nil {
err = closeErr
}
if err != nil {
return err
}
// Call `StmtCommit` uses to flush the fk cascade executor change into txn mem-buffer,
// then the later fk cascade executors can see the mem-buffer changes.
a.Ctx.StmtCommit(ctx)
err = a.handleForeignKeyTrigger(ctx, e, depth+1)
if err != nil {
return err
}
}
}
// prepareFKCascadeContext records a transaction savepoint for foreign key cascade when this ExecStmt has foreign key
// cascade behaviour and this ExecStmt is in transaction.
func (a *ExecStmt) prepareFKCascadeContext(e exec.Executor) {
var execWithFKTrigger WithForeignKeyTrigger
if explain, ok := e.(*ExplainExec); ok {
execWithFKTrigger = explain.getAnalyzeExecWithForeignKeyTrigger()
} else {
execWithFKTrigger, _ = e.(WithForeignKeyTrigger)
}
if execWithFKTrigger == nil || !execWithFKTrigger.HasFKCascades() {
return
}
sessVar := a.Ctx.GetSessionVars()
sessVar.StmtCtx.ForeignKeyTriggerCtx.HasFKCascades = true
if !sessVar.InTxn() {
return
}
txn, err := a.Ctx.Txn(false)
if err != nil || !txn.Valid() {
return
}
// Record a txn savepoint if ExecStmt in transaction, the savepoint is use to do rollback when handle foreign key
// cascade failed.
savepointName := "fk_sp_" + strconv.FormatUint(txn.StartTS(), 10)
memDBCheckpoint := txn.GetMemDBCheckpoint()
sessVar.TxnCtx.AddSavepoint(savepointName, memDBCheckpoint)
sessVar.StmtCtx.ForeignKeyTriggerCtx.SavepointName = savepointName
}
func (a *ExecStmt) handleFKTriggerError(sc *stmtctx.StatementContext) error {
if sc.ForeignKeyTriggerCtx.SavepointName == "" {
return nil
}
txn, err := a.Ctx.Txn(false)
if err != nil || !txn.Valid() {
return err
}
savepointRecord := a.Ctx.GetSessionVars().TxnCtx.RollbackToSavepoint(sc.ForeignKeyTriggerCtx.SavepointName)
if savepointRecord == nil {
// Normally should never run into here, but just in case, rollback the transaction.
err = txn.Rollback()
if err != nil {
return err
}
return errors.Errorf("foreign key cascade savepoint '%s' not found, transaction is rollback, should never happen", sc.ForeignKeyTriggerCtx.SavepointName)
}
txn.RollbackMemDBToCheckpoint(savepointRecord.MemDBCheckpoint)
a.Ctx.GetSessionVars().TxnCtx.ReleaseSavepoint(sc.ForeignKeyTriggerCtx.SavepointName)
return nil
}
func (a *ExecStmt) handleNoDelay(ctx context.Context, e exec.Executor, isPessimistic bool) (handled bool, rs sqlexec.RecordSet, err error) {
sc := a.Ctx.GetSessionVars().StmtCtx
defer func() {
// If the stmt have no rs like `insert`, The session tracker detachment will be directly
// done in the `defer` function. If the rs is not nil, the detachment will be done in
// `rs.Close` in `handleStmt`
if handled && sc != nil && rs == nil {
sc.DetachMemDiskTracker()
cteErr := resetCTEStorageMap(a.Ctx)
if err == nil {
// Only overwrite err when it's nil.
err = cteErr
}
}
}()
toCheck := e
isExplainAnalyze := false
if explain, ok := e.(*ExplainExec); ok {
if analyze := explain.getAnalyzeExecToExecutedNoDelay(); analyze != nil {
toCheck = analyze
isExplainAnalyze = true
a.Ctx.GetSessionVars().StmtCtx.IsExplainAnalyzeDML = isExplainAnalyze
}
}
// If the executor doesn't return any result to the client, we execute it without delay.
if toCheck.Schema().Len() == 0 {
handled = !isExplainAnalyze
if isPessimistic {
err := a.handlePessimisticDML(ctx, toCheck)
return handled, nil, err
}
r, err := a.handleNoDelayExecutor(ctx, toCheck)
return handled, r, err
} else if proj, ok := toCheck.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
r, err := a.handleNoDelayExecutor(ctx, e)
return true, r, err
}
return false, nil, nil
}
func isNoResultPlan(p base.Plan) bool {
if p.Schema().Len() == 0 {
return true
}
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
switch raw := p.(type) {
case *logicalop.LogicalProjection:
if raw.CalculateNoDelay {
return true
}
case *physicalop.PhysicalProjection:
if raw.CalculateNoDelay {
return true
}
}
return false
}
type chunkRowRecordSet struct {
rows []chunk.Row
idx int
fields []*resolve.ResultField
e exec.Executor
execStmt *ExecStmt
}
func (c *chunkRowRecordSet) Fields() []*resolve.ResultField {
if c.fields == nil {
c.fields = colNames2ResultFields(c.e.Schema(), c.execStmt.OutputNames, c.execStmt.Ctx.GetSessionVars().CurrentDB)
}
return c.fields
}
func (c *chunkRowRecordSet) Next(_ context.Context, chk *chunk.Chunk) error {
chk.Reset()
if !chk.IsFull() && c.idx < len(c.rows) {
numToAppend := min(len(c.rows)-c.idx, chk.RequiredRows()-chk.NumRows())
chk.AppendRows(c.rows[c.idx : c.idx+numToAppend])
c.idx += numToAppend
}
return nil
}
func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
if alloc == nil {
return exec.NewFirstChunk(c.e)
}
return alloc.Alloc(c.e.RetFieldTypes(), c.e.InitCap(), c.e.MaxChunkSize())
}
func (c *chunkRowRecordSet) Close() error {
c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
return nil
}
func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (_ sqlexec.RecordSet, retErr error) {
if snapshotTS := a.Ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
terror.Log(exec.Close(e))
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
}
txnManager := sessiontxn.GetTxnManager(a.Ctx)
err := txnManager.OnPessimisticStmtStart(ctx)
if err != nil {
return nil, err
}
defer func() {
isSuccessful := retErr == nil
err1 := txnManager.OnPessimisticStmtEnd(ctx, isSuccessful)
if retErr == nil && err1 != nil {
retErr = err1
}
}()
isFirstAttempt := true
for {
startTime := time.Now()
rs, err := a.runPessimisticSelectForUpdate(ctx, e)
if isFirstAttempt {
executor_metrics.SelectForUpdateFirstAttemptDuration.Observe(time.Since(startTime).Seconds())
isFirstAttempt = false
} else {
executor_metrics.SelectForUpdateRetryDuration.Observe(time.Since(startTime).Seconds())
}
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return nil, err
}
if e == nil {
return rs, nil
}
failpoint.Inject("pessimisticSelectForUpdateRetry", nil)
}
}
func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (sqlexec.RecordSet, error) {
defer func() {
terror.Log(exec.Close(e))
}()
var rows []chunk.Row
var err error
req := exec.TryNewCacheChunk(e)
for {
err = a.next(ctx, e, req)
if err != nil {
// Handle 'write conflict' error.
break
}
if req.NumRows() == 0 {
return &chunkRowRecordSet{rows: rows, e: e, execStmt: a}, nil
}
iter := chunk.NewIterator4Chunk(req)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
req = chunk.Renew(req, a.Ctx.GetSessionVars().MaxChunkSize)
}
return nil, err
}
func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e exec.Executor) (sqlexec.RecordSet, error) {
sctx := a.Ctx
r, ctx := tracing.StartRegionEx(ctx, "executor.handleNoDelayExecutor")
defer r.End()
var err error
defer func() {
terror.Log(exec.Close(e))
a.logAudit()
}()
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
// TODO: it's better to use a.ReadOnly to check if the statement is a write statement
// instead of listing executor types here.
switch e.(type) {
case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec, *ImportIntoExec:
snapshotTS := sctx.GetSessionVars().SnapshotTS
if snapshotTS != 0 {
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
}
if sctx.GetSessionVars().UseLowResolutionTSO() {
return nil, errors.New("can not execute write statement when 'tidb_low_resolution_tso' is set")
}
}
err = a.next(ctx, e, exec.TryNewCacheChunk(e))
if err != nil {
return nil, err
}
err = a.handleStmtForeignKeyTrigger(ctx, e)
return nil, err
}
func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e exec.Executor) (err error) {
sctx := a.Ctx
// Do not activate the transaction here.
// When autocommit = 0 and transaction in pessimistic mode,
// statements like set xxx = xxx; should not active the transaction.
txn, err := sctx.Txn(false)
if err != nil {
return err
}
txnCtx := sctx.GetSessionVars().TxnCtx
defer func() {
if err != nil && !sctx.GetSessionVars().ConstraintCheckInPlacePessimistic && sctx.GetSessionVars().InTxn() {
// If it's not a retryable error, rollback current transaction instead of rolling back current statement like
// in normal transactions, because we cannot locate and rollback the statement that leads to the lock error.
// This is too strict, but since the feature is not for everyone, it's the easiest way to guarantee safety.
stmtText := parser.Normalize(a.Text(), sctx.GetSessionVars().EnableRedactLog)
logutil.Logger(ctx).Info("Transaction abort for the safety of lazy uniqueness check. "+
"Note this may not be a uniqueness violation.",
zap.Error(err),
zap.String("statement", stmtText),
zap.Uint64("conn", sctx.GetSessionVars().ConnectionID),
zap.Uint64("txnStartTS", txnCtx.StartTS),
zap.Uint64("forUpdateTS", txnCtx.GetForUpdateTS()),
)
sctx.GetSessionVars().SetInTxn(false)
err = exeerrors.ErrLazyUniquenessCheckFailure.GenWithStackByArgs(err.Error())
}
}()
txnManager := sessiontxn.GetTxnManager(a.Ctx)
err = txnManager.OnPessimisticStmtStart(ctx)
if err != nil {
return err
}
defer func() {
isSuccessful := err == nil
err1 := txnManager.OnPessimisticStmtEnd(ctx, isSuccessful)
if err == nil && err1 != nil {
err = err1
}
}()
isFirstAttempt := true
for {
if !isFirstAttempt {
failpoint.Inject("pessimisticDMLRetry", nil)
}
startTime := time.Now()
_, err = a.handleNoDelayExecutor(ctx, e)
if !txn.Valid() {
return err
}
if isFirstAttempt {
executor_metrics.DmlFirstAttemptDuration.Observe(time.Since(startTime).Seconds())
isFirstAttempt = false
} else {
executor_metrics.DmlRetryDuration.Observe(time.Since(startTime).Seconds())
}
if err != nil {
// It is possible the DML has point get plan that locks the key.
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
if exeerrors.ErrDeadlock.Equal(err) {
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startTime).Seconds())
}
return err
}
continue
}
keys, err1 := txn.(pessimisticTxn).KeysNeedToLock()
if err1 != nil {
return err1
}
keys = txnCtx.CollectUnchangedKeysForLock(keys)
if len(keys) == 0 {
return nil
}
keys = filterTemporaryTableKeys(sctx.GetSessionVars(), keys)
seVars := sctx.GetSessionVars()
keys = filterLockTableKeys(seVars.StmtCtx, keys)
lockCtx, err := newLockCtx(sctx, seVars.LockWaitTimeout, len(keys))
if err != nil {
return err
}
var lockKeyStats *util.LockKeysDetails
ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
err = txn.LockKeys(ctx, lockCtx, keys...)
a.phaseLockDurations[0] += time.Since(startLocking)
if e.RuntimeStats() != nil {
e.RuntimeStats().Record(time.Since(startLocking), 0)
}
if lockKeyStats != nil {
seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats)
}
if err == nil {
return nil
}
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
// todo: Report deadlock
if exeerrors.ErrDeadlock.Equal(err) {
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startLocking).Seconds())
}
return err
}
}
}
// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error) (_ exec.Executor, err error) {
if lockErr == nil {
return nil, nil
}
failpoint.Inject("assertPessimisticLockErr", func() {
if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errWriteConflict")
} else if terror.ErrorEqual(kv.ErrKeyExists, lockErr) {
sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errDuplicateKey")
}
})
defer func() {
if _, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok {
err = exeerrors.ErrDeadlock
}
}()
txnManager := sessiontxn.GetTxnManager(a.Ctx)
action, err := txnManager.OnStmtErrorForNextAction(ctx, sessiontxn.StmtErrAfterPessimisticLock, lockErr)
if err != nil {
return nil, err
}
if action != sessiontxn.StmtActionRetryReady {
return nil, lockErr
}
if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic lock retry limit reached")
}
a.retryCount++
a.retryStartTime = time.Now()
err = txnManager.OnStmtRetry(ctx)
if err != nil {
return nil, err
}
// Without this line of code, the result will still be correct. But it can ensure that the update time of for update read
// is determined which is beneficial for testing.
if _, err = txnManager.GetStmtForUpdateTS(); err != nil {
return nil, err
}
breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError)
a.resetPhaseDurations()
a.inheritContextFromExecuteStmt()
e, err := a.buildExecutor()
if err != nil {
return nil, err
}
// Rollback the statement change before retry it.
a.Ctx.StmtRollback(ctx, true)
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.GetSessionVars().RetryInfo.ResetOffset()
failpoint.Inject("assertTxnManagerAfterPessimisticLockErrorRetry", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true)
})
if err = a.openExecutor(ctx, e); err != nil {
return nil, err
}
return e, nil
}
type pessimisticTxn interface {
kv.Transaction
// KeysNeedToLock returns the keys need to be locked.
KeysNeedToLock() ([]kv.Key, error)
}
// buildExecutor build an executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now())
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
if stmtCtx.Priority == mysql.NoPriority && a.LowerPriority {
stmtCtx.Priority = kv.PriorityLow
}
}
if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
}
failpoint.Inject("assertTxnManagerAfterBuildExecutor", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterBuildExecutor", true)
sessiontxn.AssertTxnManagerInfoSchema(b.ctx, b.is)
})
// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build(b)
if err != nil {
return nil, err
}
if executorExec.lowerPriority {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
e = executorExec.stmtExec
}
a.isSelectForUpdate = b.hasLock && (!stmtCtx.InDeleteStmt && !stmtCtx.InUpdateStmt && !stmtCtx.InInsertStmt)
return e, nil
}
func (a *ExecStmt) openExecutor(ctx context.Context, e exec.Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = util2.GetRecoverError(r)
}
}()
start := time.Now()
err = exec.Open(ctx, e)
a.phaseOpenDurations[0] += time.Since(start)
return err
}
func (a *ExecStmt) next(ctx context.Context, e exec.Executor, req *chunk.Chunk) error {
start := time.Now()
err := exec.Next(ctx, e, req)
a.phaseNextDurations[0] += time.Since(start)
return err
}
func (a *ExecStmt) resetPhaseDurations() {
a.phaseBuildDurations[1] += a.phaseBuildDurations[0]
a.phaseBuildDurations[0] = 0
a.phaseOpenDurations[1] += a.phaseOpenDurations[0]
a.phaseOpenDurations[0] = 0
a.phaseNextDurations[1] += a.phaseNextDurations[0]
a.phaseNextDurations[0] = 0
a.phaseLockDurations[1] += a.phaseLockDurations[0]
a.phaseLockDurations[0] = 0
}
// QueryReplacer replaces new line and tab for grep result including query string.
var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ")
func (a *ExecStmt) logAudit() {
sessVars := a.Ctx.GetSessionVars()
if sessVars.InRestrictedSQL {
return
}
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
if audit.OnGeneralEvent != nil {
cmdBin := byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))
cmd := mysql.Command2Str[cmdBin]
ctx := context.WithValue(context.Background(), plugin.ExecStartTimeCtxKey, a.Ctx.GetSessionVars().StartTime)
if execStmt, ok := a.StmtNode.(*ast.ExecuteStmt); ok {
ctx = context.WithValue(ctx, plugin.PrepareStmtIDCtxKey, execStmt.PrepStmtId)
}
ctx = context.WithValue(ctx, plugin.IsRetryingCtxKey, a.retryCount > 0 || sessVars.RetryInfo.Retrying)
if intest.InTest && (cmdBin == mysql.ComStmtPrepare ||
cmdBin == mysql.ComStmtExecute || cmdBin == mysql.ComStmtClose) {
intest.Assert(ctx.Value(plugin.PrepareStmtIDCtxKey) != nil, "prepare statement id should not be nil")
}
audit.OnGeneralEvent(ctx, sessVars, plugin.Completed, cmd)
}
return nil
})
if err != nil {
log.Error("log audit log failure", zap.Error(err))
}
}
// FormatSQL is used to format the original SQL, e.g. truncating long SQL, appending prepared arguments.
func FormatSQL(sql string) stringutil.StringerFunc {
return func() string { return formatSQL(sql) }
}
func formatSQL(sql string) string {
length := len(sql)
maxQueryLen := vardef.QueryLogMaxLen.Load()
if maxQueryLen <= 0 {
return QueryReplacer.Replace(sql) // no limit
}
if int32(length) > maxQueryLen {
var result strings.Builder
result.Grow(int(maxQueryLen))
result.WriteString(sql[:maxQueryLen])
fmt.Fprintf(&result, "(len:%d)", length)
return QueryReplacer.Replace(result.String())
}
return QueryReplacer.Replace(sql)
}
func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer {
if internal {
if ob, found := executor_metrics.PhaseDurationObserverMapInternal[phase]; found {
return ob
}
return executor_metrics.ExecUnknownInternal
}
if ob, found := executor_metrics.PhaseDurationObserverMap[phase]; found {
return ob
}
return executor_metrics.ExecUnknown
}
func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) {
for _, it := range []struct {
duration time.Duration
phase string
}{
{a.phaseBuildDurations[0], executor_metrics.PhaseBuildFinal},
{a.phaseBuildDurations[1], executor_metrics.PhaseBuildLocking},
{a.phaseOpenDurations[0], executor_metrics.PhaseOpenFinal},
{a.phaseOpenDurations[1], executor_metrics.PhaseOpenLocking},
{a.phaseNextDurations[0], executor_metrics.PhaseNextFinal},
{a.phaseNextDurations[1], executor_metrics.PhaseNextLocking},
{a.phaseLockDurations[0], executor_metrics.PhaseLockFinal},
{a.phaseLockDurations[1], executor_metrics.PhaseLockLocking},
} {
if it.duration > 0 {
getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds())
}
}
if commitDetails != nil {
for _, it := range []struct {
duration time.Duration
phase string
}{
{commitDetails.PrewriteTime, executor_metrics.PhaseCommitPrewrite},
{commitDetails.CommitTime, executor_metrics.PhaseCommitCommit},
{commitDetails.GetCommitTsTime, executor_metrics.PhaseCommitWaitCommitTS},
{commitDetails.GetLatestTsTime, executor_metrics.PhaseCommitWaitLatestTS},
{commitDetails.LocalLatchTime, executor_metrics.PhaseCommitWaitLatch},
{commitDetails.WaitPrewriteBinlogTime, executor_metrics.PhaseCommitWaitBinlog},
} {
if it.duration > 0 {
getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds())
}
}
}
if stmtDetailsRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey); stmtDetailsRaw != nil {
d := stmtDetailsRaw.(*execdetails.StmtExecDetails).WriteSQLRespDuration
if d > 0 {
getPhaseDurationObserver(executor_metrics.PhaseWriteResponse, internal).Observe(d.Seconds())
}
}
}
// FinishExecuteStmt is used to record some information after `ExecStmt` execution finished:
// 1. record slow log if needed.
// 2. record summary statement.
// 3. record execute duration metric.
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
a.checkPlanReplayerCapture(txnTS)
sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
if (execDetail.CommitDetail != nil || execDetail.LockKeysDetail != nil) && sessVars.StmtCtx.RuntimeStatsColl != nil {
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
Commit: execDetail.CommitDetail,
LockKeys: execDetail.LockKeysDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
// Record related SLI metrics.
if execDetail.CommitDetail != nil && execDetail.CommitDetail.WriteSize > 0 {
a.Ctx.GetTxnWriteThroughputSLI().AddTxnWriteSize(execDetail.CommitDetail.WriteSize, execDetail.CommitDetail.WriteKeys)
}
if execDetail.ScanDetail != nil && sessVars.StmtCtx.AffectedRows() > 0 {
processedKeys := atomic.LoadInt64(&execDetail.ScanDetail.ProcessedKeys)
if processedKeys > 0 {
// Only record the read keys in write statement which affect row more than 0.
a.Ctx.GetTxnWriteThroughputSLI().AddReadKeys(processedKeys)
}
}
succ := err == nil
if a.Plan != nil {
// If this statement has a Plan, the StmtCtx.plan should have been set when it comes here,
// but we set it again in case we missed some code paths.
sessVars.StmtCtx.SetPlan(a.Plan)
}
a.updateNetworkTrafficStatsAndMetrics()
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
a.observeStmtFinishedForTopSQL()
a.UpdatePlanCacheRuntimeInfo()
if sessVars.StmtCtx.IsTiFlash.Load() {
if succ {
executor_metrics.TotalTiFlashQuerySuccCounter.Inc()
} else {
metrics.TiFlashQueryTotalCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err), metrics.LblError).Inc()
}
}
a.updatePrevStmt()
a.recordLastQueryInfo(err)
a.recordAffectedRows2Metrics()
a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail)
executeDuration := sessVars.GetExecuteDuration()
if sessVars.InRestrictedSQL {
executor_metrics.SessionExecuteRunDurationInternal.Observe(executeDuration.Seconds())
} else {
executor_metrics.SessionExecuteRunDurationGeneral.Observe(executeDuration.Seconds())
}
// Reset DurationParse due to the next statement may not need to be parsed (not a text protocol query).
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false
// Clean the MPP query info
sessVars.StmtCtx.MPPQueryInfo.QueryID.Store(0)
sessVars.StmtCtx.MPPQueryInfo.QueryTS.Store(0)
sessVars.StmtCtx.MPPQueryInfo.AllocatedMPPTaskID.Store(0)
sessVars.StmtCtx.MPPQueryInfo.AllocatedMPPGatherID.Store(0)
if sessVars.StmtCtx.ReadFromTableCache {
metrics.ReadFromTableCacheCounter.Inc()
}
// Update fair locking related counters by stmt
if execDetail.LockKeysDetail != nil {
if execDetail.LockKeysDetail.AggressiveLockNewCount > 0 || execDetail.LockKeysDetail.AggressiveLockDerivedCount > 0 {
executor_metrics.FairLockingStmtUsedCount.Inc()
// If this statement is finished when some of the keys are locked with conflict in the last retry, or
// some of the keys are derived from the previous retry, we consider the optimization of fair locking
// takes effect on this statement.
if execDetail.LockKeysDetail.LockedWithConflictCount > 0 || execDetail.LockKeysDetail.AggressiveLockDerivedCount > 0 {
executor_metrics.FairLockingStmtEffectiveCount.Inc()
}
}
}
// If the transaction is committed, update fair locking related counters by txn
if execDetail.CommitDetail != nil {
if sessVars.TxnCtx.FairLockingUsed {
executor_metrics.FairLockingTxnUsedCount.Inc()
}
if sessVars.TxnCtx.FairLockingEffective {
executor_metrics.FairLockingTxnEffectiveCount.Inc()
}
}
a.Ctx.ReportUsageStats()
}
func (a *ExecStmt) recordAffectedRows2Metrics() {
sessVars := a.Ctx.GetSessionVars()
if affectedRows := sessVars.StmtCtx.AffectedRows(); affectedRows > 0 {
switch sessVars.StmtCtx.StmtType {
case "Insert":
metrics.AffectedRowsCounterInsert.Add(float64(affectedRows))
case "Replace":
metrics.AffectedRowsCounterReplace.Add(float64(affectedRows))
case "Delete":
metrics.AffectedRowsCounterDelete.Add(float64(affectedRows))
case "Update":
metrics.AffectedRowsCounterUpdate.Add(float64(affectedRows))
case "NTDML-Delete":
metrics.AffectedRowsCounterNTDMLDelete.Add(float64(affectedRows))
case "NTDML-Update":
metrics.AffectedRowsCounterNTDMLUpdate.Add(float64(affectedRows))
case "NTDML-Insert":
metrics.AffectedRowsCounterNTDMLInsert.Add(float64(affectedRows))
case "NTDML-Replace":
metrics.AffectedRowsCounterNTDMLReplace.Add(float64(affectedRows))
}
}
}
func (a *ExecStmt) recordLastQueryInfo(err error) {
sessVars := a.Ctx.GetSessionVars()
// Record diagnostic information for DML statements
recordLastQuery := false
switch typ := a.StmtNode.(type) {
case *ast.ShowStmt:
recordLastQuery = typ.Tp != ast.ShowSessionStates
case *ast.ExecuteStmt, ast.DMLNode:
recordLastQuery = true
}
if recordLastQuery {
var lastRUConsumption float64
if ruDetailRaw := a.GoCtx.Value(util.RUDetailsCtxKey); ruDetailRaw != nil {
ruDetail := ruDetailRaw.(*util.RUDetails)
lastRUConsumption = ruDetail.RRU() + ruDetail.WRU()
}
failpoint.Inject("mockRUConsumption", func(_ failpoint.Value) {
lastRUConsumption = float64(len(sessVars.StmtCtx.OriginalSQL))
})
// Keep the previous queryInfo for `show session_states` because the statement needs to encode it.
sessVars.LastQueryInfo = sessionstates.QueryInfo{
TxnScope: sessVars.CheckAndGetTxnScope(),
StartTS: sessVars.TxnCtx.StartTS,
ForUpdateTS: sessVars.TxnCtx.GetForUpdateTS(),
RUConsumption: lastRUConsumption,
}
if err != nil {
sessVars.LastQueryInfo.ErrMsg = err.Error()
}
}
}
func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) {
if kv.GetInternalSourceType(a.GoCtx) == kv.InternalTxnStats {
return
}
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}
}
// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
a.Ctx.GetSessionVars().StmtCtx.DetachMemDiskTracker()
}
// Clean CTE storage shared by different CTEFullScan executor within a SQL stmt.
// Will return err in two situations:
// 1. Got err when remove disk spill file.
// 2. Some logical error like ref count of CTEStorage is less than 0.
func resetCTEStorageMap(se sessionctx.Context) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}
func slowQueryDumpTriggerCheck(config *traceevent.DumpTriggerConfig) bool {
return config.Event.Type == "slow_query"
}
// LogSlowQuery is used to print the slow query in the log files.
func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
sessVars := a.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
cfg := config.GetGlobalConfig()
var slowItems *variable.SlowQueryLogItems
var matchRules bool
if !stmtCtx.WriteSlowLog {
// If the level is Debug, or trace is enabled, print slow logs anyway.
force := log.GetLevel() <= zapcore.DebugLevel || trace.IsEnabled()
if !cfg.Instance.EnableSlowLog.Load() && !force {
return
}
sessVars.StmtCtx.ExecSuccess = succ
sessVars.StmtCtx.ExecRetryCount = uint64(a.retryCount)
globalRules := vardef.GlobalSlowLogRules.Load()
slowItems = PrepareSlowLogItemsForRules(a.GoCtx, globalRules, sessVars)
// EffectiveFields is not empty (unique fields for this session including global rules),
// so we use these rules to decide whether to write the slow log.
if len(sessVars.SlowLogRules.EffectiveFields) != 0 {
matchRules = ShouldWriteSlowLog(globalRules, sessVars, slowItems)
defer putSlowLogItems(slowItems)
} else {
threshold := time.Duration(atomic.LoadUint64(&cfg.Instance.SlowThreshold)) * time.Millisecond
matchRules = sessVars.GetTotalCostDuration() >= threshold
}
if !matchRules && !force {
return
}
}
if !vardef.GlobalSlowLogRateLimiter.Allow() {
sampleLoggerFactory().Info("slow log skipped due to rate limiting", zap.Int64("tidb_slow_log_max_per_sec", int64(vardef.GlobalSlowLogRateLimiter.Limit())))
return
}
if slowItems == nil {
slowItems = &variable.SlowQueryLogItems{}
}
SetSlowLogItems(a, txnTS, hasMoreResults, slowItems)
failpoint.Inject("assertSyncStatsFailed", func(val failpoint.Value) {
if val.(bool) {
if !slowItems.IsSyncStatsFailed {
panic("isSyncStatsFailed should be true")
}
}
})
slowLog := sessVars.SlowLogFormat(slowItems)
logutil.SlowQueryLogger.Warn(slowLog)
if trace.IsEnabled() {
trace.Log(a.GoCtx, "details", slowLog)
}
traceevent.CheckFlightRecorderDumpTrigger(a.GoCtx, "dump_trigger.suspicious_event", slowQueryDumpTriggerCheck)
if !matchRules {
return
}
costTime := slowItems.TimeTotal
execDetail := slowItems.ExecDetail
if sessVars.InRestrictedSQL {
executor_metrics.TotalQueryProcHistogramInternal.Observe(costTime.Seconds())
executor_metrics.TotalCopProcHistogramInternal.Observe(execDetail.TimeDetail.ProcessTime.Seconds())
executor_metrics.TotalCopWaitHistogramInternal.Observe(execDetail.TimeDetail.WaitTime.Seconds())
} else {
executor_metrics.TotalQueryProcHistogramGeneral.Observe(costTime.Seconds())
executor_metrics.TotalCopProcHistogramGeneral.Observe(execDetail.TimeDetail.ProcessTime.Seconds())
executor_metrics.TotalCopWaitHistogramGeneral.Observe(execDetail.TimeDetail.WaitTime.Seconds())
if execDetail.ScanDetail != nil && execDetail.ScanDetail.ProcessedKeys != 0 {
executor_metrics.CopMVCCRatioHistogramGeneral.Observe(float64(execDetail.ScanDetail.TotalKeys) / float64(execDetail.ScanDetail.ProcessedKeys))
}
}
var userString string
if sessVars.User != nil {
userString = sessVars.User.String()
}
var tableIDs string
if len(sessVars.StmtCtx.TableIDs) > 0 {
tableIDs = strings.ReplaceAll(fmt.Sprintf("%v", sessVars.StmtCtx.TableIDs), " ", ",")
}
// TODO log slow query for cross keyspace query?
dom := domain.GetDomain(a.Ctx)
if dom != nil {
dom.LogSlowQuery(&domain.SlowQueryInfo{
SQL: slowItems.SQL,
Digest: slowItems.Digest,
Start: sessVars.StartTime,
Duration: costTime,
Detail: *execDetail,
Succ: succ,
ConnID: sessVars.ConnectionID,
SessAlias: sessVars.SessionAlias,
TxnTS: txnTS,
User: userString,
DB: sessVars.CurrentDB,
TableIDs: tableIDs,
IndexNames: slowItems.IndexNames,
Internal: sessVars.InRestrictedSQL,
})
}
}
func (a *ExecStmt) updateNetworkTrafficStatsAndMetrics() {
hasMPPTraffic := a.updateMPPNetworkTraffic()
tikvExecDetailRaw := a.GoCtx.Value(util.ExecDetailsKey)
if tikvExecDetailRaw != nil {
tikvExecDetail := tikvExecDetailRaw.(*util.ExecDetails)
executor_metrics.ExecutorNetworkTransmissionSentTiKVTotal.Add(float64(tikvExecDetail.UnpackedBytesSentKVTotal))
executor_metrics.ExecutorNetworkTransmissionSentTiKVCrossZone.Add(float64(tikvExecDetail.UnpackedBytesSentKVCrossZone))
executor_metrics.ExecutorNetworkTransmissionReceivedTiKVTotal.Add(float64(tikvExecDetail.UnpackedBytesReceivedKVTotal))
executor_metrics.ExecutorNetworkTransmissionReceivedTiKVCrossZone.Add(float64(tikvExecDetail.UnpackedBytesReceivedKVCrossZone))
if hasMPPTraffic {
executor_metrics.ExecutorNetworkTransmissionSentTiFlashTotal.Add(float64(tikvExecDetail.UnpackedBytesSentMPPTotal))
executor_metrics.ExecutorNetworkTransmissionSentTiFlashCrossZone.Add(float64(tikvExecDetail.UnpackedBytesSentMPPCrossZone))
executor_metrics.ExecutorNetworkTransmissionReceivedTiFlashTotal.Add(float64(tikvExecDetail.UnpackedBytesReceivedMPPTotal))
executor_metrics.ExecutorNetworkTransmissionReceivedTiFlashCrossZone.Add(float64(tikvExecDetail.UnpackedBytesReceivedMPPCrossZone))
}
}
}
func (a *ExecStmt) updateMPPNetworkTraffic() bool {
sessVars := a.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
runtimeStatsColl := stmtCtx.RuntimeStatsColl
if runtimeStatsColl == nil {
return false
}
tiflashNetworkStats := runtimeStatsColl.GetStmtCopRuntimeStats().TiflashNetworkStats
if tiflashNetworkStats == nil {
return false
}
tikvExecDetailRaw := a.GoCtx.Value(util.ExecDetailsKey)
if tikvExecDetailRaw == nil {
tikvExecDetailRaw = &util.ExecDetails{}
a.GoCtx = context.WithValue(a.GoCtx, util.ExecDetailsKey, tikvExecDetailRaw)
}
tikvExecDetail := tikvExecDetailRaw.(*util.ExecDetails)
tiflashNetworkStats.UpdateTiKVExecDetails(tikvExecDetail)
return true
}
// getFlatPlan generates a FlatPhysicalPlan from the plan stored in stmtCtx.plan,
// then stores it in stmtCtx.flatPlan.
func getFlatPlan(stmtCtx *stmtctx.StatementContext) *plannercore.FlatPhysicalPlan {
pp := stmtCtx.GetPlan()
if pp == nil {
return nil
}
if flat := stmtCtx.GetFlatPlan(); flat != nil {
f := flat.(*plannercore.FlatPhysicalPlan)
return f
}
p := pp.(base.Plan)
flat := plannercore.FlattenPhysicalPlan(p, false)
if flat != nil {
stmtCtx.SetFlatPlan(flat)
return flat
}
return nil
}
func getBinaryPlan(sCtx sessionctx.Context) string {
stmtCtx := sCtx.GetSessionVars().StmtCtx
binaryPlan := stmtCtx.GetBinaryPlan()
if len(binaryPlan) > 0 {
return binaryPlan
}
flat := getFlatPlan(stmtCtx)
binaryPlan = plannercore.BinaryPlanStrFromFlatPlan(sCtx.GetPlanCtx(), flat, false)
stmtCtx.SetBinaryPlan(binaryPlan)
return binaryPlan
}
// getPlanTree will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanTree(stmtCtx *stmtctx.StatementContext) string {
cfg := config.GetGlobalConfig()
if atomic.LoadUint32(&cfg.Instance.RecordPlanInSlowLog) == 0 {
return ""
}
planTree, _ := getEncodedPlan(stmtCtx, false)
if len(planTree) == 0 {
return planTree
}
return variable.SlowLogPlanPrefix + planTree + variable.SlowLogPlanSuffix
}
// GetPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func GetPlanDigest(stmtCtx *stmtctx.StatementContext) (string, *parser.Digest) {
normalized, planDigest := stmtCtx.GetPlanDigest()
if len(normalized) > 0 && planDigest != nil {
return normalized, planDigest
}
flat := getFlatPlan(stmtCtx)
normalized, planDigest = plannercore.NormalizeFlatPlan(flat)
stmtCtx.SetPlanDigest(normalized, planDigest)
return normalized, planDigest
}
// getEncodedPlan gets the encoded plan, and generates the hint string if indicated.
func getEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPlan, hintStr string) {
var hintSet bool
encodedPlan = stmtCtx.GetEncodedPlan()
hintStr, hintSet = stmtCtx.GetPlanHint()
if len(encodedPlan) > 0 && (!genHint || hintSet) {
return
}
flat := getFlatPlan(stmtCtx)
if len(encodedPlan) == 0 {
encodedPlan = plannercore.EncodeFlatPlan(flat)
stmtCtx.SetEncodedPlan(encodedPlan)
}
if genHint {
hints := plannercore.GenHintsFromFlatPlan(flat)
for _, tableHint := range stmtCtx.OriginalTableHints {
// some hints like 'memory_quota' cannot be extracted from the PhysicalPlan directly,
// so we have to iterate all hints from the customer and keep some other necessary hints.
switch tableHint.HintName.L {
case hint.HintMemoryQuota, hint.HintUseToja, hint.HintNoIndexMerge,
hint.HintMaxExecutionTime, hint.HintIgnoreIndex, hint.HintReadFromStorage,
hint.HintMerge, hint.HintSemiJoinRewrite, hint.HintNoDecorrelate:
hints = append(hints, tableHint)
}
}
hintStr = hint.RestoreOptimizerHints(hints)
stmtCtx.SetPlanHint(hintStr)
}
return
}
type planDigestAlias struct {
Digest string
}
func (digest planDigestAlias) planDigestDumpTriggerCheck(config *traceevent.DumpTriggerConfig) bool {
return config.UserCommand.PlanDigest == digest.Digest
}
// SummaryStmt collects statements for information_schema.statements_summary
func (a *ExecStmt) SummaryStmt(succ bool) {
sessVars := a.Ctx.GetSessionVars()
var userString string
if sessVars.User != nil {
userString = sessVars.User.Username
}
// Internal SQLs must also be recorded to keep the consistency of `PrevStmt` and `PrevStmtDigest`.
// If this SQL is under `explain explore {SQL}`, we still want to record them in stmt summary.
isInternalSQL := (sessVars.InRestrictedSQL || len(userString) == 0) && !sessVars.InExplainExplore
if !stmtsummaryv2.Enabled() || (isInternalSQL && !stmtsummaryv2.EnabledInternal()) {
sessVars.SetPrevStmtDigest("")
return
}
// Ignore `PREPARE` statements, but record `EXECUTE` statements.
if _, ok := a.StmtNode.(*ast.PrepareStmt); ok {
return
}
stmtCtx := sessVars.StmtCtx
// Make sure StmtType is filled even if succ is false.
if stmtCtx.StmtType == "" {
stmtCtx.StmtType = stmtctx.GetStmtLabel(context.Background(), a.StmtNode)
}
normalizedSQL, digest := stmtCtx.SQLDigest()
costTime := sessVars.GetTotalCostDuration()
charset, collation := sessVars.GetCharsetInfo()
var prevSQL, prevSQLDigest string
if _, ok := a.StmtNode.(*ast.CommitStmt); ok {
// If prevSQLDigest is not recorded, it means this `commit` is the first SQL once stmt summary is enabled,
// so it's OK just to ignore it.
if prevSQLDigest = sessVars.GetPrevStmtDigest(); len(prevSQLDigest) == 0 {
return
}
prevSQL = sessVars.PrevStmt.String()
}
sessVars.SetPrevStmtDigest(digest.String())
// Generating plan digest is slow, only generate it once if it's 'Point_Get'.
// If it's a point get, different SQLs leads to different plans, so SQL digest
// is enough to distinguish different plans in this case.
var planDigest string
if a.Plan.TP() != plancodec.TypePointGet {
_, tmp := GetPlanDigest(stmtCtx)
planDigest = tmp.String()
traceevent.CheckFlightRecorderDumpTrigger(a.GoCtx, "dump_trigger.user_command.plan_digest", planDigestAlias{planDigest}.planDigestDumpTriggerCheck)
}
execDetail := stmtCtx.GetExecDetails()
copTaskInfo := stmtCtx.CopTasksSummary()
memMax := sessVars.MemTracker.MaxConsumed()
diskMax := sessVars.DiskTracker.MaxConsumed()
stmtDetail, tikvExecDetail, ruDetail := execdetails.GetExecDetailsFromContext(a.GoCtx)
if stmtCtx.WaitLockLeaseTime > 0 {
if execDetail.BackoffSleep == nil {
execDetail.BackoffSleep = make(map[string]time.Duration)
}
execDetail.BackoffSleep["waitLockLeaseForCacheTable"] = stmtCtx.WaitLockLeaseTime
execDetail.BackoffTime += stmtCtx.WaitLockLeaseTime
execDetail.TimeDetail.WaitTime += stmtCtx.WaitLockLeaseTime
}
var keyspaceID uint32
keyspaceName := keyspace.GetKeyspaceNameBySettings()
if !keyspace.IsKeyspaceNameEmpty(keyspaceName) {
keyspaceID = uint32(a.Ctx.GetStore().GetCodec().GetKeyspaceID())
}
if sessVars.CacheStmtExecInfo == nil {
sessVars.CacheStmtExecInfo = &stmtsummary.StmtExecInfo{}
}
stmtExecInfo := sessVars.CacheStmtExecInfo
stmtExecInfo.SchemaName = strings.ToLower(sessVars.CurrentDB)
stmtExecInfo.Charset = charset
stmtExecInfo.Collation = collation
stmtExecInfo.NormalizedSQL = normalizedSQL
stmtExecInfo.Digest = digest.String()
stmtExecInfo.PrevSQL = prevSQL
stmtExecInfo.PrevSQLDigest = prevSQLDigest
stmtExecInfo.PlanDigest = planDigest
stmtExecInfo.User = userString
stmtExecInfo.TotalLatency = costTime
stmtExecInfo.ParseLatency = sessVars.DurationParse
stmtExecInfo.CompileLatency = sessVars.DurationCompile
stmtExecInfo.StmtCtx = stmtCtx
stmtExecInfo.CopTasks = copTaskInfo
stmtExecInfo.ExecDetail = execDetail
stmtExecInfo.MemMax = memMax
stmtExecInfo.DiskMax = diskMax
stmtExecInfo.StartTime = sessVars.StartTime
stmtExecInfo.IsInternal = isInternalSQL
stmtExecInfo.Succeed = succ
stmtExecInfo.PlanInCache = sessVars.FoundInPlanCache
stmtExecInfo.PlanInBinding = sessVars.FoundInBinding
stmtExecInfo.ExecRetryCount = a.retryCount
stmtExecInfo.StmtExecDetails = stmtDetail
stmtExecInfo.ResultRows = stmtCtx.GetResultRowsCount()
stmtExecInfo.TiKVExecDetails = &tikvExecDetail
stmtExecInfo.Prepared = a.isPreparedStmt
stmtExecInfo.KeyspaceName = keyspaceName
stmtExecInfo.KeyspaceID = keyspaceID
stmtExecInfo.RUDetail = ruDetail
stmtExecInfo.ResourceGroupName = sessVars.StmtCtx.ResourceGroupName
stmtExecInfo.CPUUsages = sessVars.SQLCPUUsages.GetCPUUsages()
stmtExecInfo.PlanCacheUnqualified = sessVars.StmtCtx.PlanCacheUnqualified()
stmtExecInfo.LazyInfo = a
if a.retryCount > 0 {
stmtExecInfo.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
}
stmtExecInfo.MemArbitration = stmtCtx.MemTracker.MemArbitration().Seconds()
stmtsummaryv2.Add(stmtExecInfo)
}
// GetOriginalSQL implements StmtExecLazyInfo interface.
func (a *ExecStmt) GetOriginalSQL() string {
stmt := a.getLazyStmtText()
return stmt.String()
}
// GetEncodedPlan implements StmtExecLazyInfo interface.
func (a *ExecStmt) GetEncodedPlan() (p string, h string, e any) {
defer func() {
e = recover()
if e != nil {
logutil.BgLogger().Warn("fail to generate plan info",
zap.Stack("backtrace"),
zap.Any("error", e))
}
}()
sessVars := a.Ctx.GetSessionVars()
p, h = getEncodedPlan(sessVars.StmtCtx, !sessVars.InRestrictedSQL)
return
}
// GetBinaryPlan implements StmtExecLazyInfo interface.
func (a *ExecStmt) GetBinaryPlan() string {
if variable.GenerateBinaryPlan.Load() {
return getBinaryPlan(a.Ctx)
}
return ""
}
// GetPlanDigest implements StmtExecLazyInfo interface.
func (a *ExecStmt) GetPlanDigest() string {
if a.Plan.TP() == plancodec.TypePointGet {
_, planDigest := GetPlanDigest(a.Ctx.GetSessionVars().StmtCtx)
return planDigest.String()
}
return ""
}
// GetBindingSQLAndDigest implements StmtExecLazyInfo interface, providing the
// normalized SQL and digest, with additional rules specific to bindings.
func (a *ExecStmt) GetBindingSQLAndDigest() (s string, d string) {
normalizedSQL, digest := parser.NormalizeDigestForBinding(bindinfo.RestoreDBForBinding(a.StmtNode, a.Ctx.GetSessionVars().CurrentDB))
return normalizedSQL, digest.String()
}
// GetTextToLog return the query text to log.
func (a *ExecStmt) GetTextToLog(keepHint bool) string {
var sql string
sessVars := a.Ctx.GetSessionVars()
rmode := sessVars.EnableRedactLog
if rmode == errors.RedactLogEnable {
if keepHint {
sql = parser.NormalizeKeepHint(sessVars.StmtCtx.OriginalSQL)
} else {
sql, _ = sessVars.StmtCtx.SQLDigest()
}
} else if sensitiveStmt, ok := a.StmtNode.(ast.SensitiveStmtNode); ok {
sql = sensitiveStmt.SecureText()
} else {
sql = redact.String(rmode, sessVars.StmtCtx.OriginalSQL+sessVars.PlanCacheParams.String())
}
return sql
}
// getLazyText is equivalent to `a.GetTextToLog(false)`. Note that the s.Params is a shallow copy of
// `sessVars.PlanCacheParams`, so you can only use the lazy text within the current stmt context.
func (a *ExecStmt) getLazyStmtText() (s variable.LazyStmtText) {
sessVars := a.Ctx.GetSessionVars()
rmode := sessVars.EnableRedactLog
if rmode == errors.RedactLogEnable {
sql, _ := sessVars.StmtCtx.SQLDigest()
s.SetText(sql)
} else if sensitiveStmt, ok := a.StmtNode.(ast.SensitiveStmtNode); ok {
sql := sensitiveStmt.SecureText()
s.SetText(sql)
} else {
s.Redact = rmode
s.SQL = sessVars.StmtCtx.OriginalSQL
s.Params = *sessVars.PlanCacheParams
}
return
}
// updatePrevStmt is equivalent to `sessVars.PrevStmt = FormatSQL(a.GetTextToLog(false))`
func (a *ExecStmt) updatePrevStmt() {
sessVars := a.Ctx.GetSessionVars()
if sessVars.PrevStmt == nil {
sessVars.PrevStmt = &variable.LazyStmtText{Format: formatSQL}
}
rmode := sessVars.EnableRedactLog
if rmode == errors.RedactLogEnable {
sql, _ := sessVars.StmtCtx.SQLDigest()
sessVars.PrevStmt.SetText(sql)
} else if sensitiveStmt, ok := a.StmtNode.(ast.SensitiveStmtNode); ok {
sql := sensitiveStmt.SecureText()
sessVars.PrevStmt.SetText(sql)
} else {
sessVars.PrevStmt.Update(rmode, sessVars.StmtCtx.OriginalSQL, sessVars.PlanCacheParams)
}
}
func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Context {
if !topsqlstate.TopSQLEnabled() && IsFastPlan(a.Plan) {
// To reduce the performance impact on fast plan.
// Drop them does not cause notable accuracy issue in TopSQL.
return ctx
}
vars := a.Ctx.GetSessionVars()
sc := vars.StmtCtx
normalizedSQL, sqlDigest := sc.SQLDigest()
normalizedPlan, planDigest := GetPlanDigest(sc)
var sqlDigestByte, planDigestByte []byte
if sqlDigest != nil {
sqlDigestByte = sqlDigest.Bytes()
}
if planDigest != nil {
planDigestByte = planDigest.Bytes()
}
stats := a.Ctx.GetStmtStats()
if !topsqlstate.TopSQLEnabled() {
// Always attach the SQL and plan info uses to catch the running SQL when Top SQL is enabled in execution.
if stats != nil {
stats.OnExecutionBegin(sqlDigestByte, planDigestByte, vars.InPacketBytes.Load())
}
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
}
if stats != nil {
stats.OnExecutionBegin(sqlDigestByte, planDigestByte, vars.InPacketBytes.Load())
// This is a special logic prepared for TiKV's SQLExecCount.
sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte)
}
isSQLRegistered := sc.IsSQLRegistered.Load()
if !isSQLRegistered {
topsql.RegisterSQL(normalizedSQL, sqlDigest, vars.InRestrictedSQL)
}
sc.IsSQLAndPlanRegistered.Store(true)
if len(normalizedPlan) == 0 {
return ctx
}
topsql.RegisterPlan(normalizedPlan, planDigest)
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
}
// UpdatePlanCacheRuntimeInfo updates the runtime information of the plan in the plan cache.
func (a *ExecStmt) UpdatePlanCacheRuntimeInfo() {
if !vardef.EnableInstancePlanCache.Load() {
return // only record for Instance Plan Cache
}
v := a.Ctx.GetSessionVars().PlanCacheValue
if v == nil {
return
}
pcv, ok := v.(*plannercore.PlanCacheValue)
if !ok {
return
}
execDetail := a.Ctx.GetSessionVars().StmtCtx.GetExecDetails()
var procKeys, totKeys int64
if execDetail.ScanDetail != nil { // only support TiKV
procKeys = execDetail.ScanDetail.ProcessedKeys
totKeys = execDetail.ScanDetail.TotalKeys
}
costTime := a.Ctx.GetSessionVars().GetTotalCostDuration()
pcv.UpdateRuntimeInfo(procKeys, totKeys, int64(costTime))
a.Ctx.GetSessionVars().PlanCacheValue = nil // reset
}
func (a *ExecStmt) observeStmtFinishedForTopSQL() {
vars := a.Ctx.GetSessionVars()
if vars == nil {
return
}
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
sqlDigest, planDigest := a.getSQLPlanDigest()
execDuration := vars.GetTotalCostDuration()
stats.OnExecutionFinished(sqlDigest, planDigest, execDuration, vars.OutPacketBytes.Load())
}
}
func (a *ExecStmt) getSQLPlanDigest() (sqlDigest, planDigest []byte) {
vars := a.Ctx.GetSessionVars()
if _, d := vars.StmtCtx.SQLDigest(); d != nil {
sqlDigest = d.Bytes()
}
if _, d := vars.StmtCtx.GetPlanDigest(); d != nil {
planDigest = d.Bytes()
}
return sqlDigest, planDigest
}
// only allow select/delete/update/insert/execute stmt captured by continues capture
func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool {
switch stmtNode.(type) {
case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt:
return true
default:
return false
}
}
func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
tasks := handle.GetTasks()
if len(tasks) == 0 {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
if sqlDigest == nil || planDigest == nil {
return
}
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
}
}
func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}
func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(bindinfo.SessionBindingHandle)
bindings := handle.GetAllSessionBindings()
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
TblStats: stmtCtx.TableStats,
SessionBindings: [][]*bindinfo.Binding{bindings},
SessionVars: sctx.GetSessionVars(),
ExecStmts: []ast.StmtNode{stmtNode},
Analyze: false,
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
dumpTask.EncodedPlan, _ = getEncodedPlan(stmtCtx, false)
if execStmtAst, ok := stmtNode.(*ast.ExecuteStmt); ok {
planCacheStmt, err := plannercore.GetPreparedStmt(execStmtAst, sctx.GetSessionVars())
if err != nil {
logutil.BgLogger().Warn("fail to find prepared ast for dumping plan replayer", zap.String("category", "plan-replayer-capture"),
zap.String("sqlDigest", key.SQLDigest),
zap.String("planDigest", key.PlanDigest),
zap.Error(err))
} else {
dumpTask.ExecStmts = []ast.StmtNode{planCacheStmt.PreparedAst.Stmt}
}
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}