Files
tidb/pkg/planner/core/plan_cache.go

399 lines
15 KiB
Go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"context"
"math"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/planner/core/base"
core_metrics "github.com/pingcap/tidb/pkg/planner/core/metrics"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/core/resolve"
"github.com/pingcap/tidb/pkg/privilege"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/types"
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util/chunk"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/hint"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
// PlanCacheKeyTestIssue43667 is only for test.
type PlanCacheKeyTestIssue43667 struct{}
// PlanCacheKeyTestIssue46760 is only for test.
type PlanCacheKeyTestIssue46760 struct{}
// PlanCacheKeyTestIssue47133 is only for test.
type PlanCacheKeyTestIssue47133 struct{}
// PlanCacheKeyTestClone is only for test.
type PlanCacheKeyTestClone struct{}
// PlanCacheKeyEnableInstancePlanCache is only for test.
type PlanCacheKeyEnableInstancePlanCache struct{}
// SetParameterValuesIntoSCtx sets these parameters into session context.
func SetParameterValuesIntoSCtx(sctx base.PlanContext, isNonPrep bool, markers []ast.ParamMarkerExpr, params []expression.Expression) error {
vars := sctx.GetSessionVars()
vars.PlanCacheParams.Reset()
for i, usingParam := range params {
var (
val types.Datum
err error
)
val, err = usingParam.Eval(sctx.GetExprCtx().GetEvalCtx(), chunk.Row{})
if err != nil {
return err
}
if isGetVarBinaryLiteral(sctx, usingParam) {
binVal, convErr := val.ToBytes()
if convErr != nil {
return convErr
}
val.SetBinaryLiteral(binVal)
}
if markers != nil {
param := markers[i].(*driver.ParamMarkerExpr)
param.Datum = val
param.InExecute = true
}
vars.PlanCacheParams.Append(val)
}
vars.PlanCacheParams.SetForNonPrepCache(isNonPrep)
return nil
}
func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, params []expression.Expression) error {
vars := sctx.GetSessionVars()
stmtAst := stmt.PreparedAst
vars.StmtCtx.StmtType = stmtAst.StmtType
// step 1: check parameter number
if len(stmt.Params) != len(params) {
return errors.Trace(plannererrors.ErrWrongParamCount)
}
// step 2: set parameter values
if err := SetParameterValuesIntoSCtx(sctx.GetPlanCtx(), isNonPrepared, stmt.Params, params); err != nil {
return errors.Trace(err)
}
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := range stmt.dbName {
tbl, ok := is.TableByID(ctx, stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(context.Background(), stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
// Table ID is changed, for example, drop & create table, truncate table.
delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
tbl = tblByName
}
// newTbl is the 'should be used' table info for this execution.
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(ctx, sctx.GetPlanCtx(), stmt.dbName[i], tbl, is)
if err != nil {
logutil.BgLogger().Warn("meet error during tryLockMDLAndUpdateSchemaIfNecessary", zap.String("table name", tbl.Meta().Name.String()), zap.Error(err))
// Invalid the cache key related fields to avoid using plan cache.
stmt.RelateVersion[tbl.Meta().ID] = math.MaxUint64
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
schemaNotMatch = true
}
// Update the cache key related fields.
stmt.tbls[i] = newTbl
stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
}
// step 4: check schema version
if schemaNotMatch || stmt.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
// Cached plan in prepared struct does NOT have a "cache key" with
// schema version like prepared plan cache key
stmt.PointGet.Executor = nil
stmt.PointGet.ColumnInfos = nil
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
// Example:
// When running update in prepared statement's schema version distinguished from the one of execute statement
// We should reset the tableRefs in the prepared update statements, otherwise, the ast nodes still hold the old
// tableRefs columnInfo which will cause chaos in logic of trying point get plan. (should ban non-public column)
ret := &PreprocessorReturn{InfoSchema: is}
nodeW := resolve.NewNodeW(stmtAst.Stmt)
err := Preprocess(ctx, sctx, nodeW, InPrepare, WithPreprocessorReturn(ret))
if err != nil {
return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
stmt.ResolveCtx = nodeW.GetResolveContext()
stmt.SchemaVersion = is.SchemaMetaVersion()
}
// step 5: handle expiration
// If the lastUpdateTime less than expiredTimeStamp4PC,
// it means other sessions have executed 'admin flush instance plan_cache'.
// So we need to clear the current session's plan cache.
// And update lastUpdateTime to the newest one.
expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC()
if stmt.StmtCacheable && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 {
sctx.GetSessionPlanCache().DeleteAll()
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}
// step 6: initialize the tableInfo2UnionScan, which indicates which tables are dirty.
for _, tbl := range stmt.tbls {
tblInfo := tbl.Meta()
if tableHasDirtyContent(sctx.GetPlanCtx(), tblInfo) {
sctx.GetSessionVars().StmtCtx.TblInfo2UnionScan[tblInfo] = true
}
}
return nil
}
// GetPlanFromPlanCache is the entry point of Plan Cache.
// It tries to get a valid cached plan from plan cache.
// If there is no such a plan, it'll call the optimizer to generate a new one.
// isNonPrepared indicates whether to use the non-prepared plan cache or the prepared plan cache.
func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context,
isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt,
params []expression.Expression) (plan base.Plan, names []*types.FieldName, err error) {
if err := planCachePreprocess(ctx, sctx, isNonPrepared, is, stmt, params); err != nil {
return nil, nil, err
}
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
cacheEnabled := false
if isNonPrepared {
stmtCtx.SetCacheType(contextutil.SessionNonPrepared)
cacheEnabled = sessVars.EnableNonPreparedPlanCache // plan-cache might be disabled after prepare.
} else {
stmtCtx.SetCacheType(contextutil.SessionPrepared)
cacheEnabled = sessVars.EnablePreparedPlanCache
}
if stmt.StmtCacheable && cacheEnabled {
stmtCtx.EnablePlanCache()
}
if stmt.UncacheableReason != "" {
stmtCtx.WarnSkipPlanCache(stmt.UncacheableReason)
}
var cacheKey, binding, reason string
var cacheable bool
if stmtCtx.UseCache() {
cacheKey, binding, cacheable, reason, err = NewPlanCacheKey(sctx, stmt)
if err != nil {
return nil, nil, err
}
if !cacheable {
stmtCtx.SetSkipPlanCache(reason)
}
}
paramTypes := parseParamTypes(sctx, params)
if stmtCtx.UseCache() {
plan, outputCols, stmtHints, hit := lookupPlanCache(ctx, sctx, cacheKey, paramTypes)
skipPrivCheck := stmt.PointGet.Executor != nil // this case is specially handled
if hit && instancePlanCacheEnabled(ctx) {
plan, hit = clonePlanForInstancePlanCache(ctx, sctx, stmt, plan)
}
if hit {
if plan, ok, err := adjustCachedPlan(ctx, sctx, plan, stmtHints, isNonPrepared, skipPrivCheck, binding, is, stmt); err != nil || ok {
return plan, outputCols, err
}
}
}
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, binding, paramTypes)
}
func clonePlanForInstancePlanCache(ctx context.Context, sctx sessionctx.Context,
stmt *PlanCacheStmt, plan base.Plan) (clonedPlan base.Plan, ok bool) {
defer func(begin time.Time) {
if ok {
core_metrics.GetPlanCacheCloneDuration().Observe(time.Since(begin).Seconds())
}
}(time.Now())
fastPoint := stmt.PointGet.Executor != nil // this case is specially handled
pointPlan, isPoint := plan.(*physicalop.PointGetPlan)
if fastPoint && isPoint { // special optimization for fast point plans
if stmt.PointGet.FastPlan == nil {
stmt.PointGet.FastPlan = new(physicalop.PointGetPlan)
}
FastClonePointGetForPlanCache(sctx.GetPlanCtx(), pointPlan, stmt.PointGet.FastPlan)
clonedPlan = stmt.PointGet.FastPlan
} else {
clonedPlan, ok = plan.CloneForPlanCache(sctx.GetPlanCtx())
if !ok { // clone the value to solve concurrency problem
return nil, false
}
}
if intest.InTest && ctx.Value(PlanCacheKeyTestClone{}) != nil {
ctx.Value(PlanCacheKeyTestClone{}).(func(plan, cloned base.Plan))(plan, clonedPlan)
}
return clonedPlan, true
}
func instancePlanCacheEnabled(ctx context.Context) bool {
if intest.InTest && ctx.Value(PlanCacheKeyEnableInstancePlanCache{}) != nil {
return true
}
enableInstancePlanCache := vardef.EnableInstancePlanCache.Load()
return enableInstancePlanCache
}
func lookupPlanCache(ctx context.Context, sctx sessionctx.Context, cacheKey string,
paramTypes []*types.FieldType) (plan base.Plan, outputCols types.NameSlice, stmtHints *hint.StmtHints, hit bool) {
useInstanceCache := instancePlanCacheEnabled(ctx)
defer func(begin time.Time) {
if hit {
core_metrics.GetPlanCacheLookupDuration(useInstanceCache).Observe(time.Since(begin).Seconds())
}
}(time.Now())
var v any
if useInstanceCache {
v, hit = domain.GetDomain(sctx).GetInstancePlanCache().Get(cacheKey, paramTypes)
} else {
v, hit = sctx.GetSessionPlanCache().Get(cacheKey, paramTypes)
}
if !hit {
return nil, nil, nil, false
}
pcv := v.(*PlanCacheValue)
sctx.GetSessionVars().PlanCacheValue = pcv
return pcv.Plan, pcv.OutputColumns, pcv.StmtHints, true
}
func adjustCachedPlan(ctx context.Context, sctx sessionctx.Context,
plan base.Plan, stmtHints *hint.StmtHints, isNonPrepared, skipPrivCheck bool,
bindSQL string, is infoschema.InfoSchema, stmt *PlanCacheStmt) (
base.Plan, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
if !skipPrivCheck { // keep the prior behavior
if err := checkPreparedPriv(ctx, sctx, stmt, is); err != nil {
return nil, false, err
}
}
if !RebuildPlan4CachedPlan(plan) {
return nil, false, nil
}
sessVars.FoundInPlanCache = true
if len(bindSQL) > 0 { // We're using binding, set this to true.
sessVars.FoundInBinding = true
}
if metrics.ResettablePlanCacheCounterFortTest {
metrics.PlanCacheCounter.WithLabelValues("prepare").Inc()
} else {
core_metrics.GetPlanCacheHitCounter(isNonPrepared).Inc()
}
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
stmtCtx.StmtHints = *stmtHints
return plan, true, nil
}
// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema,
stmt *PlanCacheStmt, cacheKey, binding string, paramTypes []*types.FieldType) (base.Plan, []*types.FieldName, error) {
stmtAst := stmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
core_metrics.GetPlanCacheMissCounter(isNonPrepared).Inc()
nodeW := resolve.NewNodeWWithCtx(stmtAst.Stmt, stmt.ResolveCtx)
p, names, err := OptimizeAstNodeNoCache(ctx, sctx, nodeW, is)
if err != nil {
return nil, nil, err
}
// check whether this plan is cacheable.
if stmtCtx.UseCache() {
if cacheable, reason := isPlanCacheable(sctx.GetPlanCtx(), p, len(paramTypes), len(stmt.limits), stmt.hasSubquery); !cacheable {
stmtCtx.SetSkipPlanCache(reason)
}
}
// put this plan into the plan cache.
if stmtCtx.UseCache() {
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
cached := NewPlanCacheValue(sctx, stmt, cacheKey, binding, p, names, paramTypes, &stmtCtx.StmtHints)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
if instancePlanCacheEnabled(ctx) {
if cloned, ok := p.CloneForPlanCache(sctx.GetPlanCtx()); ok {
// Clone this plan before putting it into the cache to avoid read-write DATA RACE. For example,
// before this session finishes the execution, the next session has started cloning this plan.
// Time: | ------------------------------------------------------------------------------- |
// Sess1: | put plan into cache | ----------- execution (might modify the plan) ----------- |
// Sess2: | start | ------- hit this plan and clone it (DATA RACE) ------- |
cached.Plan = cloned
domain.GetDomain(sctx).GetInstancePlanCache().Put(cacheKey, cached, paramTypes)
}
} else {
sctx.GetSessionPlanCache().Put(cacheKey, cached, paramTypes)
}
sctx.GetSessionVars().PlanCacheValue = cached
}
sessVars.FoundInPlanCache = false
return p, names, err
}
// checkPreparedPriv checks the privilege of the prepared statement
func checkPreparedPriv(ctx context.Context, sctx sessionctx.Context, stmt *PlanCacheStmt, is infoschema.InfoSchema) error {
if pm := privilege.GetPrivilegeManager(sctx); pm != nil {
visitInfo := VisitInfo4PrivCheck(ctx, is, stmt.PreparedAst.Stmt, stmt.VisitInfos)
if err := CheckPrivilege(sctx.GetSessionVars().ActiveRoles, pm, visitInfo); err != nil {
return err
}
}
err := CheckTableLock(sctx, is, stmt.VisitInfos)
return err
}
// IsSafeToReusePointGetExecutor checks whether this is a PointGet Plan and safe to reuse its executor.
func IsSafeToReusePointGetExecutor(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) bool {
if staleread.IsStmtStaleness(sctx) {
return false
}
// check auto commit
if !IsAutoCommitTxn(sctx.GetSessionVars()) {
return false
}
if stmt.SchemaVersion != is.SchemaMetaVersion() {
return false
}
return true
}