From cfd875219ef36f01f5201e4e495f950b47539cd7 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Wed, 1 Apr 2020 16:05:36 +0800 Subject: [PATCH] planner: invalidate cached plan if no UnionScan for tables that need it (#15429) --- executor/executor.go | 1 + planner/core/cache.go | 17 +++-- planner/core/common_plans.go | 38 +++++++---- planner/core/logical_plan_builder.go | 32 ++++------ planner/core/prepare_test.go | 94 ++++++++++++++++++++++++++++ planner/core/util.go | 17 +++++ sessionctx/stmtctx/stmtctx.go | 1 + 7 files changed, 161 insertions(+), 39 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index e3834929cb..a2853f3cec 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1642,6 +1642,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } else if vars.StmtCtx.InSelectStmt { sc.PrevAffectedRows = -1 } + sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool) errCount, warnCount := vars.StmtCtx.NumErrorWarnings() vars.SysErrorCount = errCount vars.SysWarningCount = warnCount diff --git a/planner/core/cache.go b/planner/core/cache.go index 546b797ded..a1ddb90eed 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" @@ -125,15 +126,21 @@ func NewPSTMTPlanCacheKey(sessionVars *variable.SessionVars, pstmtID uint32, sch // PSTMTPlanCacheValue stores the cached Statement and StmtNode. type PSTMTPlanCacheValue struct { - Plan Plan - OutPutNames []*types.FieldName + Plan Plan + OutPutNames []*types.FieldName + TblInfo2UnionScan map[*model.TableInfo]bool } // NewPSTMTPlanCacheValue creates a SQLCacheValue. -func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName) *PSTMTPlanCacheValue { +func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.TableInfo]bool) *PSTMTPlanCacheValue { + dstMap := make(map[*model.TableInfo]bool) + for k, v := range srcMap { + dstMap[k] = v + } return &PSTMTPlanCacheValue{ - Plan: plan, - OutPutNames: names, + Plan: plan, + OutPutNames: names, + TblInfo2UnionScan: dstMap, } } diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index f6fff418b7..fc41830d14 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -295,20 +295,32 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, if err := e.checkPreparedPriv(ctx, sctx, preparedStmt, is); err != nil { return err } - if metrics.ResettablePlanCacheCounterFortTest { - metrics.PlanCacheCounter.WithLabelValues("prepare").Inc() - } else { - planCacheCounter.Inc() - } cachedVal := cacheValue.(*PSTMTPlanCacheValue) - err := e.rebuildRange(cachedVal.Plan) - if err != nil { - return err + planValid := true + for tblInfo, unionScan := range cachedVal.TblInfo2UnionScan { + if !unionScan && tableHasDirtyContent(sctx, tblInfo) { + planValid = false + // TODO we can inject UnionScan into cached plan to avoid invalidating it, though + // rebuilding the filters in UnionScan is pretty trivial. + sctx.PreparedPlanCache().Delete(cacheKey) + break + } + } + if planValid { + if metrics.ResettablePlanCacheCounterFortTest { + metrics.PlanCacheCounter.WithLabelValues("prepare").Inc() + } else { + planCacheCounter.Inc() + } + err := e.rebuildRange(cachedVal.Plan) + if err != nil { + return err + } + e.names = cachedVal.OutPutNames + e.Plan = cachedVal.Plan + stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest) + return nil } - e.names = cachedVal.OutPutNames - e.Plan = cachedVal.Plan - stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest) - return nil } } p, names, err := OptimizeAstNode(ctx, sctx, prepared.Stmt, is) @@ -324,7 +336,7 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, isRange := e.isRangePartition(p) _, isTableDual := p.(*PhysicalTableDual) if !isTableDual && prepared.UseCache && !isRange { - cached := NewPSTMTPlanCacheValue(p, names) + cached := NewPSTMTPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan) preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p) stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest) sctx.PreparedPlanCache().Put(cacheKey, cached) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 9a81bb500e..a4e5dcea8b 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2595,8 +2595,9 @@ func getStatsTable(ctx sessionctx.Context, tblInfo *model.TableInfo, pid int64) func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, asName *model.CIStr) (LogicalPlan, error) { dbName := tn.Schema + sessionVars := b.ctx.GetSessionVars() if dbName.L == "" { - dbName = model.NewCIStr(b.ctx.GetSessionVars().CurrentDB) + dbName = model.NewCIStr(sessionVars.CurrentDB) } tbl, err := b.is.TableByName(dbName, tn.Name) @@ -2606,8 +2607,8 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as tableInfo := tbl.Meta() var authErr error - if b.ctx.GetSessionVars().User != nil { - authErr = ErrTableaccessDenied.FastGenByArgs("SELECT", b.ctx.GetSessionVars().User.Username, b.ctx.GetSessionVars().User.Hostname, tableInfo.Name.L) + if sessionVars.User != nil { + authErr = ErrTableaccessDenied.FastGenByArgs("SELECT", sessionVars.User.Username, sessionVars.User.Hostname, tableInfo.Name.L) } b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, dbName.L, tableInfo.Name.L, "", authErr) @@ -2721,7 +2722,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as Hidden: col.Hidden, }) newCol := &expression.Column{ - UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: sessionVars.AllocPlanColumnID(), ID: col.ID, RetType: &col.FieldType, OrigName: names[i].String(), @@ -2768,27 +2769,16 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as } var result LogicalPlan = ds - - needUS := false - if pi := tableInfo.GetPartitionInfo(); pi == nil { - if b.ctx.HasDirtyContent(tableInfo.ID) { - needUS = true - } - } else { - // Currently, we'll add a UnionScan on every partition even though only one partition's data is changed. - // This is limited by current implementation of Partition Prune. It'll updated once we modify that part. - for _, partition := range pi.Definitions { - if b.ctx.HasDirtyContent(partition.ID) { - needUS = true - break - } - } - } - if needUS { + dirty := tableHasDirtyContent(b.ctx, tableInfo) + if dirty { us := LogicalUnionScan{handleCol: handleCol}.Init(b.ctx, b.getSelectOffset()) us.SetChildren(ds) result = us } + if sessionVars.StmtCtx.TblInfo2UnionScan == nil { + sessionVars.StmtCtx.TblInfo2UnionScan = make(map[*model.TableInfo]bool) + } + sessionVars.StmtCtx.TblInfo2UnionScan[tableInfo] = dirty for i, colExpr := range ds.Schema().Columns { var expr expression.Expression diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 68c90d8519..41100c9208 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -581,3 +581,97 @@ func (s *testPrepareSerialSuite) TestConstPropAndPPDWithCache(c *C) { "0", )) } + +func (s *testPlanSerialSuite) TestPlanCacheUnionScan(c *C) { + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + orgEnable := core.PreparedPlanCacheEnabled() + defer func() { + dom.Close() + store.Close() + core.SetPreparedPlanCache(orgEnable) + }() + core.SetPreparedPlanCache(true) + tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + pb := &dto.Metric{} + metrics.ResettablePlanCacheCounterFortTest = true + metrics.PlanCacheCounter.Reset() + counter := metrics.PlanCacheCounter.WithLabelValues("prepare") + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1(a int not null)") + tk.MustExec("create table t2(a int not null)") + tk.MustExec("prepare stmt1 from 'select * from t1 where a > ?'") + tk.MustExec("set @p0 = 0") + tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows()) + tk.MustExec("begin") + tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows()) + counter.Write(pb) + cnt := pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(1)) + tk.MustExec("insert into t1 values(1)") + // Cached plan is invalid now, it is not chosen and removed. + tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows( + "1", + )) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(1)) + tk.MustExec("insert into t2 values(1)") + // Cached plan is chosen, modification on t2 does not impact plan of t1. + tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows( + "1", + )) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(2)) + tk.MustExec("rollback") + // Though cached plan contains UnionScan, it does not impact correctness, so it is reused. + tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows()) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(3)) + + tk.MustExec("prepare stmt2 from 'select * from t1 left join t2 on true where t1.a > ?'") + tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows()) + tk.MustExec("begin") + tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows()) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(4)) + tk.MustExec("insert into t1 values(1)") + // Cached plan is invalid now, it is not chosen and removed. + tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows( + "1 ", + )) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(4)) + tk.MustExec("insert into t2 values(1)") + // Cached plan is invalid now, it is not chosen and removed. + tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows( + "1 1", + )) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(4)) + // Cached plan is reused. + tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows( + "1 1", + )) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(5)) + tk.MustExec("rollback") + // Though cached plan contains UnionScan, it does not impact correctness, so it is reused. + tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows()) + counter.Write(pb) + cnt = pb.GetCounter().GetValue() + c.Check(cnt, Equals, float64(6)) +} diff --git a/planner/core/util.go b/planner/core/util.go index 926cea4955..dcfc7c56b0 100644 --- a/planner/core/util.go +++ b/planner/core/util.go @@ -19,7 +19,9 @@ import ( "strings" "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/set" ) @@ -249,3 +251,18 @@ func extractStringFromStringSet(set set.StringSet) string { sort.Strings(l) return fmt.Sprintf("%s", strings.Join(l, ",")) } + +func tableHasDirtyContent(ctx sessionctx.Context, tableInfo *model.TableInfo) bool { + pi := tableInfo.GetPartitionInfo() + if pi == nil { + return ctx.HasDirtyContent(tableInfo.ID) + } + // Currently, we add UnionScan on every partition even though only one partition's data is changed. + // This is limited by current implementation of Partition Prune. It'll be updated once we modify that part. + for _, partition := range pi.Definitions { + if ctx.HasDirtyContent(partition.ID) { + return true + } + } + return false +} diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 50fb139fb6..fc9f2aac61 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -143,6 +143,7 @@ type StatementContext struct { PessimisticLockWaited int32 LockKeysDuration time.Duration LockKeysCount int32 + TblInfo2UnionScan map[*model.TableInfo]bool } // StmtHints are SessionVars related sql hints.