399 lines
15 KiB
Go
399 lines
15 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package core
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/planner/core/base"
|
|
core_metrics "github.com/pingcap/tidb/pkg/planner/core/metrics"
|
|
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
|
|
"github.com/pingcap/tidb/pkg/planner/core/resolve"
|
|
"github.com/pingcap/tidb/pkg/privilege"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
contextutil "github.com/pingcap/tidb/pkg/util/context"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
|
|
"github.com/pingcap/tidb/pkg/util/hint"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// PlanCacheKeyTestIssue43667 is only for test.
|
|
type PlanCacheKeyTestIssue43667 struct{}
|
|
|
|
// PlanCacheKeyTestIssue46760 is only for test.
|
|
type PlanCacheKeyTestIssue46760 struct{}
|
|
|
|
// PlanCacheKeyTestIssue47133 is only for test.
|
|
type PlanCacheKeyTestIssue47133 struct{}
|
|
|
|
// PlanCacheKeyTestClone is only for test.
|
|
type PlanCacheKeyTestClone struct{}
|
|
|
|
// PlanCacheKeyEnableInstancePlanCache is only for test.
|
|
type PlanCacheKeyEnableInstancePlanCache struct{}
|
|
|
|
// SetParameterValuesIntoSCtx sets these parameters into session context.
|
|
func SetParameterValuesIntoSCtx(sctx base.PlanContext, isNonPrep bool, markers []ast.ParamMarkerExpr, params []expression.Expression) error {
|
|
vars := sctx.GetSessionVars()
|
|
vars.PlanCacheParams.Reset()
|
|
for i, usingParam := range params {
|
|
var (
|
|
val types.Datum
|
|
err error
|
|
)
|
|
val, err = usingParam.Eval(sctx.GetExprCtx().GetEvalCtx(), chunk.Row{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isGetVarBinaryLiteral(sctx, usingParam) {
|
|
binVal, convErr := val.ToBytes()
|
|
if convErr != nil {
|
|
return convErr
|
|
}
|
|
val.SetBinaryLiteral(binVal)
|
|
}
|
|
if markers != nil {
|
|
param := markers[i].(*driver.ParamMarkerExpr)
|
|
param.Datum = val
|
|
param.InExecute = true
|
|
}
|
|
vars.PlanCacheParams.Append(val)
|
|
}
|
|
vars.PlanCacheParams.SetForNonPrepCache(isNonPrep)
|
|
return nil
|
|
}
|
|
|
|
func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, params []expression.Expression) error {
|
|
vars := sctx.GetSessionVars()
|
|
stmtAst := stmt.PreparedAst
|
|
vars.StmtCtx.StmtType = stmtAst.StmtType
|
|
|
|
// step 1: check parameter number
|
|
if len(stmt.Params) != len(params) {
|
|
return errors.Trace(plannererrors.ErrWrongParamCount)
|
|
}
|
|
|
|
// step 2: set parameter values
|
|
if err := SetParameterValuesIntoSCtx(sctx.GetPlanCtx(), isNonPrepared, stmt.Params, params); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// step 3: add metadata lock and check each table's schema version
|
|
schemaNotMatch := false
|
|
for i := range stmt.dbName {
|
|
tbl, ok := is.TableByID(ctx, stmt.tbls[i].Meta().ID)
|
|
if !ok {
|
|
tblByName, err := is.TableByName(context.Background(), stmt.dbName[i], stmt.tbls[i].Meta().Name)
|
|
if err != nil {
|
|
return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
|
|
}
|
|
// Table ID is changed, for example, drop & create table, truncate table.
|
|
delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
|
|
tbl = tblByName
|
|
}
|
|
// newTbl is the 'should be used' table info for this execution.
|
|
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(ctx, sctx.GetPlanCtx(), stmt.dbName[i], tbl, is)
|
|
if err != nil {
|
|
logutil.BgLogger().Warn("meet error during tryLockMDLAndUpdateSchemaIfNecessary", zap.String("table name", tbl.Meta().Name.String()), zap.Error(err))
|
|
// Invalid the cache key related fields to avoid using plan cache.
|
|
stmt.RelateVersion[tbl.Meta().ID] = math.MaxUint64
|
|
schemaNotMatch = true
|
|
continue
|
|
}
|
|
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
|
|
schemaNotMatch = true
|
|
}
|
|
// Update the cache key related fields.
|
|
stmt.tbls[i] = newTbl
|
|
stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
|
|
}
|
|
|
|
// step 4: check schema version
|
|
if schemaNotMatch || stmt.SchemaVersion != is.SchemaMetaVersion() {
|
|
// In order to avoid some correctness issues, we have to clear the
|
|
// cached plan once the schema version is changed.
|
|
// Cached plan in prepared struct does NOT have a "cache key" with
|
|
// schema version like prepared plan cache key
|
|
stmt.PointGet.Executor = nil
|
|
stmt.PointGet.ColumnInfos = nil
|
|
// If the schema version has changed we need to preprocess it again,
|
|
// if this time it failed, the real reason for the error is schema changed.
|
|
// Example:
|
|
// When running update in prepared statement's schema version distinguished from the one of execute statement
|
|
// We should reset the tableRefs in the prepared update statements, otherwise, the ast nodes still hold the old
|
|
// tableRefs columnInfo which will cause chaos in logic of trying point get plan. (should ban non-public column)
|
|
ret := &PreprocessorReturn{InfoSchema: is}
|
|
nodeW := resolve.NewNodeW(stmtAst.Stmt)
|
|
err := Preprocess(ctx, sctx, nodeW, InPrepare, WithPreprocessorReturn(ret))
|
|
if err != nil {
|
|
return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
|
|
}
|
|
stmt.ResolveCtx = nodeW.GetResolveContext()
|
|
stmt.SchemaVersion = is.SchemaMetaVersion()
|
|
}
|
|
|
|
// step 5: handle expiration
|
|
// If the lastUpdateTime less than expiredTimeStamp4PC,
|
|
// it means other sessions have executed 'admin flush instance plan_cache'.
|
|
// So we need to clear the current session's plan cache.
|
|
// And update lastUpdateTime to the newest one.
|
|
expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC()
|
|
if stmt.StmtCacheable && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 {
|
|
sctx.GetSessionPlanCache().DeleteAll()
|
|
vars.LastUpdateTime4PC = expiredTimeStamp4PC
|
|
}
|
|
|
|
// step 6: initialize the tableInfo2UnionScan, which indicates which tables are dirty.
|
|
for _, tbl := range stmt.tbls {
|
|
tblInfo := tbl.Meta()
|
|
if tableHasDirtyContent(sctx.GetPlanCtx(), tblInfo) {
|
|
sctx.GetSessionVars().StmtCtx.TblInfo2UnionScan[tblInfo] = true
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetPlanFromPlanCache is the entry point of Plan Cache.
|
|
// It tries to get a valid cached plan from plan cache.
|
|
// If there is no such a plan, it'll call the optimizer to generate a new one.
|
|
// isNonPrepared indicates whether to use the non-prepared plan cache or the prepared plan cache.
|
|
func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context,
|
|
isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt,
|
|
params []expression.Expression) (plan base.Plan, names []*types.FieldName, err error) {
|
|
if err := planCachePreprocess(ctx, sctx, isNonPrepared, is, stmt, params); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
sessVars := sctx.GetSessionVars()
|
|
stmtCtx := sessVars.StmtCtx
|
|
cacheEnabled := false
|
|
if isNonPrepared {
|
|
stmtCtx.SetCacheType(contextutil.SessionNonPrepared)
|
|
cacheEnabled = sessVars.EnableNonPreparedPlanCache // plan-cache might be disabled after prepare.
|
|
} else {
|
|
stmtCtx.SetCacheType(contextutil.SessionPrepared)
|
|
cacheEnabled = sessVars.EnablePreparedPlanCache
|
|
}
|
|
if stmt.StmtCacheable && cacheEnabled {
|
|
stmtCtx.EnablePlanCache()
|
|
}
|
|
if stmt.UncacheableReason != "" {
|
|
stmtCtx.WarnSkipPlanCache(stmt.UncacheableReason)
|
|
}
|
|
|
|
var cacheKey, binding, reason string
|
|
var cacheable bool
|
|
if stmtCtx.UseCache() {
|
|
cacheKey, binding, cacheable, reason, err = NewPlanCacheKey(sctx, stmt)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if !cacheable {
|
|
stmtCtx.SetSkipPlanCache(reason)
|
|
}
|
|
}
|
|
|
|
paramTypes := parseParamTypes(sctx, params)
|
|
if stmtCtx.UseCache() {
|
|
plan, outputCols, stmtHints, hit := lookupPlanCache(ctx, sctx, cacheKey, paramTypes)
|
|
skipPrivCheck := stmt.PointGet.Executor != nil // this case is specially handled
|
|
if hit && instancePlanCacheEnabled(ctx) {
|
|
plan, hit = clonePlanForInstancePlanCache(ctx, sctx, stmt, plan)
|
|
}
|
|
if hit {
|
|
if plan, ok, err := adjustCachedPlan(ctx, sctx, plan, stmtHints, isNonPrepared, skipPrivCheck, binding, is, stmt); err != nil || ok {
|
|
return plan, outputCols, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, binding, paramTypes)
|
|
}
|
|
|
|
func clonePlanForInstancePlanCache(ctx context.Context, sctx sessionctx.Context,
|
|
stmt *PlanCacheStmt, plan base.Plan) (clonedPlan base.Plan, ok bool) {
|
|
defer func(begin time.Time) {
|
|
if ok {
|
|
core_metrics.GetPlanCacheCloneDuration().Observe(time.Since(begin).Seconds())
|
|
}
|
|
}(time.Now())
|
|
fastPoint := stmt.PointGet.Executor != nil // this case is specially handled
|
|
pointPlan, isPoint := plan.(*physicalop.PointGetPlan)
|
|
if fastPoint && isPoint { // special optimization for fast point plans
|
|
if stmt.PointGet.FastPlan == nil {
|
|
stmt.PointGet.FastPlan = new(physicalop.PointGetPlan)
|
|
}
|
|
FastClonePointGetForPlanCache(sctx.GetPlanCtx(), pointPlan, stmt.PointGet.FastPlan)
|
|
clonedPlan = stmt.PointGet.FastPlan
|
|
} else {
|
|
clonedPlan, ok = plan.CloneForPlanCache(sctx.GetPlanCtx())
|
|
if !ok { // clone the value to solve concurrency problem
|
|
return nil, false
|
|
}
|
|
}
|
|
if intest.InTest && ctx.Value(PlanCacheKeyTestClone{}) != nil {
|
|
ctx.Value(PlanCacheKeyTestClone{}).(func(plan, cloned base.Plan))(plan, clonedPlan)
|
|
}
|
|
return clonedPlan, true
|
|
}
|
|
|
|
func instancePlanCacheEnabled(ctx context.Context) bool {
|
|
if intest.InTest && ctx.Value(PlanCacheKeyEnableInstancePlanCache{}) != nil {
|
|
return true
|
|
}
|
|
enableInstancePlanCache := vardef.EnableInstancePlanCache.Load()
|
|
return enableInstancePlanCache
|
|
}
|
|
|
|
func lookupPlanCache(ctx context.Context, sctx sessionctx.Context, cacheKey string,
|
|
paramTypes []*types.FieldType) (plan base.Plan, outputCols types.NameSlice, stmtHints *hint.StmtHints, hit bool) {
|
|
useInstanceCache := instancePlanCacheEnabled(ctx)
|
|
defer func(begin time.Time) {
|
|
if hit {
|
|
core_metrics.GetPlanCacheLookupDuration(useInstanceCache).Observe(time.Since(begin).Seconds())
|
|
}
|
|
}(time.Now())
|
|
var v any
|
|
if useInstanceCache {
|
|
v, hit = domain.GetDomain(sctx).GetInstancePlanCache().Get(cacheKey, paramTypes)
|
|
} else {
|
|
v, hit = sctx.GetSessionPlanCache().Get(cacheKey, paramTypes)
|
|
}
|
|
if !hit {
|
|
return nil, nil, nil, false
|
|
}
|
|
pcv := v.(*PlanCacheValue)
|
|
sctx.GetSessionVars().PlanCacheValue = pcv
|
|
return pcv.Plan, pcv.OutputColumns, pcv.StmtHints, true
|
|
}
|
|
|
|
func adjustCachedPlan(ctx context.Context, sctx sessionctx.Context,
|
|
plan base.Plan, stmtHints *hint.StmtHints, isNonPrepared, skipPrivCheck bool,
|
|
bindSQL string, is infoschema.InfoSchema, stmt *PlanCacheStmt) (
|
|
base.Plan, bool, error) {
|
|
sessVars := sctx.GetSessionVars()
|
|
stmtCtx := sessVars.StmtCtx
|
|
if !skipPrivCheck { // keep the prior behavior
|
|
if err := checkPreparedPriv(ctx, sctx, stmt, is); err != nil {
|
|
return nil, false, err
|
|
}
|
|
}
|
|
if !RebuildPlan4CachedPlan(plan) {
|
|
return nil, false, nil
|
|
}
|
|
sessVars.FoundInPlanCache = true
|
|
if len(bindSQL) > 0 { // We're using binding, set this to true.
|
|
sessVars.FoundInBinding = true
|
|
}
|
|
if metrics.ResettablePlanCacheCounterFortTest {
|
|
metrics.PlanCacheCounter.WithLabelValues("prepare").Inc()
|
|
} else {
|
|
core_metrics.GetPlanCacheHitCounter(isNonPrepared).Inc()
|
|
}
|
|
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
|
|
stmtCtx.StmtHints = *stmtHints
|
|
return plan, true, nil
|
|
}
|
|
|
|
// generateNewPlan call the optimizer to generate a new plan for current statement
|
|
// and try to add it to cache
|
|
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema,
|
|
stmt *PlanCacheStmt, cacheKey, binding string, paramTypes []*types.FieldType) (base.Plan, []*types.FieldName, error) {
|
|
stmtAst := stmt.PreparedAst
|
|
sessVars := sctx.GetSessionVars()
|
|
stmtCtx := sessVars.StmtCtx
|
|
|
|
core_metrics.GetPlanCacheMissCounter(isNonPrepared).Inc()
|
|
nodeW := resolve.NewNodeWWithCtx(stmtAst.Stmt, stmt.ResolveCtx)
|
|
p, names, err := OptimizeAstNodeNoCache(ctx, sctx, nodeW, is)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// check whether this plan is cacheable.
|
|
if stmtCtx.UseCache() {
|
|
if cacheable, reason := isPlanCacheable(sctx.GetPlanCtx(), p, len(paramTypes), len(stmt.limits), stmt.hasSubquery); !cacheable {
|
|
stmtCtx.SetSkipPlanCache(reason)
|
|
}
|
|
}
|
|
|
|
// put this plan into the plan cache.
|
|
if stmtCtx.UseCache() {
|
|
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
|
|
cached := NewPlanCacheValue(sctx, stmt, cacheKey, binding, p, names, paramTypes, &stmtCtx.StmtHints)
|
|
stmtCtx.SetPlan(p)
|
|
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
|
|
if instancePlanCacheEnabled(ctx) {
|
|
if cloned, ok := p.CloneForPlanCache(sctx.GetPlanCtx()); ok {
|
|
// Clone this plan before putting it into the cache to avoid read-write DATA RACE. For example,
|
|
// before this session finishes the execution, the next session has started cloning this plan.
|
|
// Time: | ------------------------------------------------------------------------------- |
|
|
// Sess1: | put plan into cache | ----------- execution (might modify the plan) ----------- |
|
|
// Sess2: | start | ------- hit this plan and clone it (DATA RACE) ------- |
|
|
cached.Plan = cloned
|
|
domain.GetDomain(sctx).GetInstancePlanCache().Put(cacheKey, cached, paramTypes)
|
|
}
|
|
} else {
|
|
sctx.GetSessionPlanCache().Put(cacheKey, cached, paramTypes)
|
|
}
|
|
sctx.GetSessionVars().PlanCacheValue = cached
|
|
}
|
|
sessVars.FoundInPlanCache = false
|
|
return p, names, err
|
|
}
|
|
|
|
// checkPreparedPriv checks the privilege of the prepared statement
|
|
func checkPreparedPriv(ctx context.Context, sctx sessionctx.Context, stmt *PlanCacheStmt, is infoschema.InfoSchema) error {
|
|
if pm := privilege.GetPrivilegeManager(sctx); pm != nil {
|
|
visitInfo := VisitInfo4PrivCheck(ctx, is, stmt.PreparedAst.Stmt, stmt.VisitInfos)
|
|
if err := CheckPrivilege(sctx.GetSessionVars().ActiveRoles, pm, visitInfo); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
err := CheckTableLock(sctx, is, stmt.VisitInfos)
|
|
return err
|
|
}
|
|
|
|
// IsSafeToReusePointGetExecutor checks whether this is a PointGet Plan and safe to reuse its executor.
|
|
func IsSafeToReusePointGetExecutor(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) bool {
|
|
if staleread.IsStmtStaleness(sctx) {
|
|
return false
|
|
}
|
|
// check auto commit
|
|
if !IsAutoCommitTxn(sctx.GetSessionVars()) {
|
|
return false
|
|
}
|
|
if stmt.SchemaVersion != is.SchemaMetaVersion() {
|
|
return false
|
|
}
|
|
return true
|
|
}
|