planner: invalidate cached plan if no UnionScan for tables that need it (#15429)
This commit is contained in:
@ -1642,6 +1642,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
|
||||
} else if vars.StmtCtx.InSelectStmt {
|
||||
sc.PrevAffectedRows = -1
|
||||
}
|
||||
sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
|
||||
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
|
||||
vars.SysErrorCount = errCount
|
||||
vars.SysWarningCount = warnCount
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/parser/ast"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/types"
|
||||
@ -125,15 +126,21 @@ func NewPSTMTPlanCacheKey(sessionVars *variable.SessionVars, pstmtID uint32, sch
|
||||
|
||||
// PSTMTPlanCacheValue stores the cached Statement and StmtNode.
|
||||
type PSTMTPlanCacheValue struct {
|
||||
Plan Plan
|
||||
OutPutNames []*types.FieldName
|
||||
Plan Plan
|
||||
OutPutNames []*types.FieldName
|
||||
TblInfo2UnionScan map[*model.TableInfo]bool
|
||||
}
|
||||
|
||||
// NewPSTMTPlanCacheValue creates a SQLCacheValue.
|
||||
func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName) *PSTMTPlanCacheValue {
|
||||
func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.TableInfo]bool) *PSTMTPlanCacheValue {
|
||||
dstMap := make(map[*model.TableInfo]bool)
|
||||
for k, v := range srcMap {
|
||||
dstMap[k] = v
|
||||
}
|
||||
return &PSTMTPlanCacheValue{
|
||||
Plan: plan,
|
||||
OutPutNames: names,
|
||||
Plan: plan,
|
||||
OutPutNames: names,
|
||||
TblInfo2UnionScan: dstMap,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -295,20 +295,32 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
|
||||
if err := e.checkPreparedPriv(ctx, sctx, preparedStmt, is); err != nil {
|
||||
return err
|
||||
}
|
||||
if metrics.ResettablePlanCacheCounterFortTest {
|
||||
metrics.PlanCacheCounter.WithLabelValues("prepare").Inc()
|
||||
} else {
|
||||
planCacheCounter.Inc()
|
||||
}
|
||||
cachedVal := cacheValue.(*PSTMTPlanCacheValue)
|
||||
err := e.rebuildRange(cachedVal.Plan)
|
||||
if err != nil {
|
||||
return err
|
||||
planValid := true
|
||||
for tblInfo, unionScan := range cachedVal.TblInfo2UnionScan {
|
||||
if !unionScan && tableHasDirtyContent(sctx, tblInfo) {
|
||||
planValid = false
|
||||
// TODO we can inject UnionScan into cached plan to avoid invalidating it, though
|
||||
// rebuilding the filters in UnionScan is pretty trivial.
|
||||
sctx.PreparedPlanCache().Delete(cacheKey)
|
||||
break
|
||||
}
|
||||
}
|
||||
if planValid {
|
||||
if metrics.ResettablePlanCacheCounterFortTest {
|
||||
metrics.PlanCacheCounter.WithLabelValues("prepare").Inc()
|
||||
} else {
|
||||
planCacheCounter.Inc()
|
||||
}
|
||||
err := e.rebuildRange(cachedVal.Plan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.names = cachedVal.OutPutNames
|
||||
e.Plan = cachedVal.Plan
|
||||
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
|
||||
return nil
|
||||
}
|
||||
e.names = cachedVal.OutPutNames
|
||||
e.Plan = cachedVal.Plan
|
||||
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
p, names, err := OptimizeAstNode(ctx, sctx, prepared.Stmt, is)
|
||||
@ -324,7 +336,7 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
|
||||
isRange := e.isRangePartition(p)
|
||||
_, isTableDual := p.(*PhysicalTableDual)
|
||||
if !isTableDual && prepared.UseCache && !isRange {
|
||||
cached := NewPSTMTPlanCacheValue(p, names)
|
||||
cached := NewPSTMTPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan)
|
||||
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
|
||||
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
|
||||
sctx.PreparedPlanCache().Put(cacheKey, cached)
|
||||
|
||||
@ -2595,8 +2595,9 @@ func getStatsTable(ctx sessionctx.Context, tblInfo *model.TableInfo, pid int64)
|
||||
|
||||
func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, asName *model.CIStr) (LogicalPlan, error) {
|
||||
dbName := tn.Schema
|
||||
sessionVars := b.ctx.GetSessionVars()
|
||||
if dbName.L == "" {
|
||||
dbName = model.NewCIStr(b.ctx.GetSessionVars().CurrentDB)
|
||||
dbName = model.NewCIStr(sessionVars.CurrentDB)
|
||||
}
|
||||
|
||||
tbl, err := b.is.TableByName(dbName, tn.Name)
|
||||
@ -2606,8 +2607,8 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
|
||||
|
||||
tableInfo := tbl.Meta()
|
||||
var authErr error
|
||||
if b.ctx.GetSessionVars().User != nil {
|
||||
authErr = ErrTableaccessDenied.FastGenByArgs("SELECT", b.ctx.GetSessionVars().User.Username, b.ctx.GetSessionVars().User.Hostname, tableInfo.Name.L)
|
||||
if sessionVars.User != nil {
|
||||
authErr = ErrTableaccessDenied.FastGenByArgs("SELECT", sessionVars.User.Username, sessionVars.User.Hostname, tableInfo.Name.L)
|
||||
}
|
||||
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, dbName.L, tableInfo.Name.L, "", authErr)
|
||||
|
||||
@ -2721,7 +2722,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
|
||||
Hidden: col.Hidden,
|
||||
})
|
||||
newCol := &expression.Column{
|
||||
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
|
||||
UniqueID: sessionVars.AllocPlanColumnID(),
|
||||
ID: col.ID,
|
||||
RetType: &col.FieldType,
|
||||
OrigName: names[i].String(),
|
||||
@ -2768,27 +2769,16 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
|
||||
}
|
||||
|
||||
var result LogicalPlan = ds
|
||||
|
||||
needUS := false
|
||||
if pi := tableInfo.GetPartitionInfo(); pi == nil {
|
||||
if b.ctx.HasDirtyContent(tableInfo.ID) {
|
||||
needUS = true
|
||||
}
|
||||
} else {
|
||||
// Currently, we'll add a UnionScan on every partition even though only one partition's data is changed.
|
||||
// This is limited by current implementation of Partition Prune. It'll updated once we modify that part.
|
||||
for _, partition := range pi.Definitions {
|
||||
if b.ctx.HasDirtyContent(partition.ID) {
|
||||
needUS = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if needUS {
|
||||
dirty := tableHasDirtyContent(b.ctx, tableInfo)
|
||||
if dirty {
|
||||
us := LogicalUnionScan{handleCol: handleCol}.Init(b.ctx, b.getSelectOffset())
|
||||
us.SetChildren(ds)
|
||||
result = us
|
||||
}
|
||||
if sessionVars.StmtCtx.TblInfo2UnionScan == nil {
|
||||
sessionVars.StmtCtx.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
|
||||
}
|
||||
sessionVars.StmtCtx.TblInfo2UnionScan[tableInfo] = dirty
|
||||
|
||||
for i, colExpr := range ds.Schema().Columns {
|
||||
var expr expression.Expression
|
||||
|
||||
@ -581,3 +581,97 @@ func (s *testPrepareSerialSuite) TestConstPropAndPPDWithCache(c *C) {
|
||||
"0",
|
||||
))
|
||||
}
|
||||
|
||||
func (s *testPlanSerialSuite) TestPlanCacheUnionScan(c *C) {
|
||||
store, dom, err := newStoreWithBootstrap()
|
||||
c.Assert(err, IsNil)
|
||||
tk := testkit.NewTestKit(c, store)
|
||||
orgEnable := core.PreparedPlanCacheEnabled()
|
||||
defer func() {
|
||||
dom.Close()
|
||||
store.Close()
|
||||
core.SetPreparedPlanCache(orgEnable)
|
||||
}()
|
||||
core.SetPreparedPlanCache(true)
|
||||
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
|
||||
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
|
||||
})
|
||||
c.Assert(err, IsNil)
|
||||
pb := &dto.Metric{}
|
||||
metrics.ResettablePlanCacheCounterFortTest = true
|
||||
metrics.PlanCacheCounter.Reset()
|
||||
counter := metrics.PlanCacheCounter.WithLabelValues("prepare")
|
||||
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists t1")
|
||||
tk.MustExec("drop table if exists t2")
|
||||
tk.MustExec("create table t1(a int not null)")
|
||||
tk.MustExec("create table t2(a int not null)")
|
||||
tk.MustExec("prepare stmt1 from 'select * from t1 where a > ?'")
|
||||
tk.MustExec("set @p0 = 0")
|
||||
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows())
|
||||
tk.MustExec("begin")
|
||||
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows())
|
||||
counter.Write(pb)
|
||||
cnt := pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(1))
|
||||
tk.MustExec("insert into t1 values(1)")
|
||||
// Cached plan is invalid now, it is not chosen and removed.
|
||||
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows(
|
||||
"1",
|
||||
))
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(1))
|
||||
tk.MustExec("insert into t2 values(1)")
|
||||
// Cached plan is chosen, modification on t2 does not impact plan of t1.
|
||||
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows(
|
||||
"1",
|
||||
))
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(2))
|
||||
tk.MustExec("rollback")
|
||||
// Though cached plan contains UnionScan, it does not impact correctness, so it is reused.
|
||||
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows())
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(3))
|
||||
|
||||
tk.MustExec("prepare stmt2 from 'select * from t1 left join t2 on true where t1.a > ?'")
|
||||
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows())
|
||||
tk.MustExec("begin")
|
||||
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows())
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(4))
|
||||
tk.MustExec("insert into t1 values(1)")
|
||||
// Cached plan is invalid now, it is not chosen and removed.
|
||||
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows(
|
||||
"1 <nil>",
|
||||
))
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(4))
|
||||
tk.MustExec("insert into t2 values(1)")
|
||||
// Cached plan is invalid now, it is not chosen and removed.
|
||||
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows(
|
||||
"1 1",
|
||||
))
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(4))
|
||||
// Cached plan is reused.
|
||||
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows(
|
||||
"1 1",
|
||||
))
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(5))
|
||||
tk.MustExec("rollback")
|
||||
// Though cached plan contains UnionScan, it does not impact correctness, so it is reused.
|
||||
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows())
|
||||
counter.Write(pb)
|
||||
cnt = pb.GetCounter().GetValue()
|
||||
c.Check(cnt, Equals, float64(6))
|
||||
}
|
||||
|
||||
@ -19,7 +19,9 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/pingcap/parser/ast"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/set"
|
||||
)
|
||||
@ -249,3 +251,18 @@ func extractStringFromStringSet(set set.StringSet) string {
|
||||
sort.Strings(l)
|
||||
return fmt.Sprintf("%s", strings.Join(l, ","))
|
||||
}
|
||||
|
||||
func tableHasDirtyContent(ctx sessionctx.Context, tableInfo *model.TableInfo) bool {
|
||||
pi := tableInfo.GetPartitionInfo()
|
||||
if pi == nil {
|
||||
return ctx.HasDirtyContent(tableInfo.ID)
|
||||
}
|
||||
// Currently, we add UnionScan on every partition even though only one partition's data is changed.
|
||||
// This is limited by current implementation of Partition Prune. It'll be updated once we modify that part.
|
||||
for _, partition := range pi.Definitions {
|
||||
if ctx.HasDirtyContent(partition.ID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -143,6 +143,7 @@ type StatementContext struct {
|
||||
PessimisticLockWaited int32
|
||||
LockKeysDuration time.Duration
|
||||
LockKeysCount int32
|
||||
TblInfo2UnionScan map[*model.TableInfo]bool
|
||||
}
|
||||
|
||||
// StmtHints are SessionVars related sql hints.
|
||||
|
||||
Reference in New Issue
Block a user