planner: check infoSchema when builing cacheKey (#34957)
close pingcap/tidb#34974
This commit is contained in:
@ -57,11 +57,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
|
||||
}
|
||||
|
||||
ret := &plannercore.PreprocessorReturn{}
|
||||
pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode}
|
||||
err := plannercore.Preprocess(c.Ctx,
|
||||
stmtNode,
|
||||
plannercore.WithPreprocessorReturn(ret),
|
||||
plannercore.WithExecuteInfoSchemaUpdate(pe),
|
||||
plannercore.InitTxnContextProvider,
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
@ -335,7 +335,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
|
||||
prepared := preparedObj.PreparedAst
|
||||
delete(vars.PreparedStmtNameToID, e.Name)
|
||||
if plannercore.PreparedPlanCacheEnabled() {
|
||||
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion)
|
||||
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -56,14 +56,20 @@ func PreparedPlanCacheEnabled() bool {
|
||||
// However, due to some compatibility reasons, we will temporarily keep some system variable-related values in planCacheKey.
|
||||
// At the same time, because these variables have a small impact on plan, we will move them to PlanCacheValue later if necessary.
|
||||
type planCacheKey struct {
|
||||
database string
|
||||
connID uint64
|
||||
stmtText string
|
||||
schemaVersion int64
|
||||
sqlMode mysql.SQLMode
|
||||
timezoneOffset int
|
||||
isolationReadEngines map[kv.StoreType]struct{}
|
||||
selectLimit uint64
|
||||
database string
|
||||
connID uint64
|
||||
stmtText string
|
||||
schemaVersion int64
|
||||
|
||||
// Only be set in rc or for update read and leave it default otherwise.
|
||||
// In Rc or ForUpdateRead, we should check whether the information schema has been changed when using plan cache.
|
||||
// If it changed, we should rebuild the plan. lastUpdatedSchemaVersion help us to decide whether we should rebuild
|
||||
// the plan in rc or for update read.
|
||||
lastUpdatedSchemaVersion int64
|
||||
sqlMode mysql.SQLMode
|
||||
timezoneOffset int
|
||||
isolationReadEngines map[kv.StoreType]struct{}
|
||||
selectLimit uint64
|
||||
|
||||
hash []byte
|
||||
}
|
||||
@ -82,6 +88,7 @@ func (key *planCacheKey) Hash() []byte {
|
||||
key.hash = codec.EncodeInt(key.hash, int64(key.connID))
|
||||
key.hash = append(key.hash, hack.Slice(key.stmtText)...)
|
||||
key.hash = codec.EncodeInt(key.hash, key.schemaVersion)
|
||||
key.hash = codec.EncodeInt(key.hash, key.lastUpdatedSchemaVersion)
|
||||
key.hash = codec.EncodeInt(key.hash, int64(key.sqlMode))
|
||||
key.hash = codec.EncodeInt(key.hash, int64(key.timezoneOffset))
|
||||
if _, ok := key.isolationReadEngines[kv.TiDB]; ok {
|
||||
@ -115,10 +122,16 @@ func SetPstmtIDSchemaVersion(key kvcache.Key, stmtText string, schemaVersion int
|
||||
}
|
||||
|
||||
// NewPlanCacheKey creates a new planCacheKey object.
|
||||
func NewPlanCacheKey(sessionVars *variable.SessionVars, stmtText, stmtDB string, schemaVersion int64) (kvcache.Key, error) {
|
||||
// Note: lastUpdatedSchemaVersion will only be set in the case of rc or for update read in order to
|
||||
// differentiate the cache key. In other cases, it will be 0.
|
||||
func NewPlanCacheKey(sessionVars *variable.SessionVars, stmtText, stmtDB string, schemaVersion int64,
|
||||
lastUpdatedSchemaVersion int64) (kvcache.Key, error) {
|
||||
if stmtText == "" {
|
||||
return nil, errors.New("no statement text")
|
||||
}
|
||||
if schemaVersion == 0 {
|
||||
return nil, errors.New("Schema version uninitialized")
|
||||
}
|
||||
if stmtDB == "" {
|
||||
stmtDB = sessionVars.CurrentDB
|
||||
}
|
||||
@ -127,14 +140,15 @@ func NewPlanCacheKey(sessionVars *variable.SessionVars, stmtText, stmtDB string,
|
||||
_, timezoneOffset = time.Now().In(sessionVars.TimeZone).Zone()
|
||||
}
|
||||
key := &planCacheKey{
|
||||
database: stmtDB,
|
||||
connID: sessionVars.ConnectionID,
|
||||
stmtText: stmtText,
|
||||
schemaVersion: schemaVersion,
|
||||
sqlMode: sessionVars.SQLMode,
|
||||
timezoneOffset: timezoneOffset,
|
||||
isolationReadEngines: make(map[kv.StoreType]struct{}),
|
||||
selectLimit: sessionVars.SelectLimit,
|
||||
database: stmtDB,
|
||||
connID: sessionVars.ConnectionID,
|
||||
stmtText: stmtText,
|
||||
schemaVersion: schemaVersion,
|
||||
lastUpdatedSchemaVersion: lastUpdatedSchemaVersion,
|
||||
sqlMode: sessionVars.SQLMode,
|
||||
timezoneOffset: timezoneOffset,
|
||||
isolationReadEngines: make(map[kv.StoreType]struct{}),
|
||||
selectLimit: sessionVars.SelectLimit,
|
||||
}
|
||||
for k, v := range sessionVars.IsolationReadEngines {
|
||||
key.isolationReadEngines[k] = v
|
||||
|
||||
@ -28,17 +28,17 @@ func TestCacheKey(t *testing.T) {
|
||||
ctx.GetSessionVars().SQLMode = mysql.ModeNone
|
||||
ctx.GetSessionVars().TimeZone = time.UTC
|
||||
ctx.GetSessionVars().ConnectionID = 0
|
||||
key, err := NewPlanCacheKey(ctx.GetSessionVars(), "", "test", 1)
|
||||
key, err := NewPlanCacheKey(ctx.GetSessionVars(), "", "test", 1, 1)
|
||||
if err.Error() != "no statement text" {
|
||||
t.Fail() // no statement text
|
||||
}
|
||||
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "", 1)
|
||||
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "", 1, 1)
|
||||
if err != nil {
|
||||
t.Fail() // schema can be nil
|
||||
}
|
||||
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "test", 1)
|
||||
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "test", 1, 1)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
require.Equal(t, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x20, 0x31, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x74, 0x69, 0x64, 0x62, 0x74, 0x69, 0x6b, 0x76, 0x74, 0x69, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, key.Hash())
|
||||
require.Equal(t, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x20, 0x31, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x74, 0x69, 0x64, 0x62, 0x74, 0x69, 0x6b, 0x76, 0x74, 0x69, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, key.Hash())
|
||||
}
|
||||
|
||||
@ -451,9 +451,21 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
|
||||
stmtCtx.UseCache = prepared.UseCache
|
||||
|
||||
var bindSQL string
|
||||
|
||||
// In rc or for update read, we need the latest schema version to decide whether we need to
|
||||
// rebuild the plan. So we set this value in rc or for update read. In other cases, let it be 0.
|
||||
var latestSchemaVersion int64
|
||||
|
||||
if prepared.UseCache {
|
||||
bindSQL = GetBindSQL4PlanCache(sctx, preparedStmt)
|
||||
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), preparedStmt.StmtText, preparedStmt.StmtDB, prepared.SchemaVersion); err != nil {
|
||||
if sctx.GetSessionVars().IsIsolation(ast.ReadCommitted) || preparedStmt.ForUpdateRead {
|
||||
// In Rc or ForUpdateRead, we should check if the information schema has been changed since
|
||||
// last time. If it changed, we should rebuild the plan. Here, we use a different and more
|
||||
// up-to-date schema version which can lead plan cache miss and thus, the plan will be rebuilt.
|
||||
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
|
||||
}
|
||||
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), preparedStmt.StmtText,
|
||||
preparedStmt.StmtDB, prepared.SchemaVersion, latestSchemaVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -586,7 +598,8 @@ REBUILD:
|
||||
// rebuild key to exclude kv.TiFlash when stmt is not read only
|
||||
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) {
|
||||
delete(sessVars.IsolationReadEngines, kv.TiFlash)
|
||||
if cacheKey, err = NewPlanCacheKey(sessVars, preparedStmt.StmtText, preparedStmt.StmtDB, prepared.SchemaVersion); err != nil {
|
||||
if cacheKey, err = NewPlanCacheKey(sessVars, preparedStmt.StmtText, preparedStmt.StmtDB,
|
||||
prepared.SchemaVersion, latestSchemaVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
|
||||
|
||||
@ -2931,3 +2931,184 @@ func TestPlanCacheWithRCWhenInfoSchemaChange(t *testing.T) {
|
||||
tk2.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 0"))
|
||||
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
|
||||
}
|
||||
|
||||
func TestConsistencyBetweenPrepareExecuteAndNormalSql(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, clean := testkit.CreateMockStore(t)
|
||||
defer clean()
|
||||
|
||||
tk1 := testkit.NewTestKit(t, store)
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk1.MustExec("use test")
|
||||
tk2.MustExec("use test")
|
||||
tk1.MustExec("drop table if exists t1")
|
||||
tk1.MustExec("create table t1(id int primary key, c int)")
|
||||
tk1.MustExec("insert into t1 values(1, 1), (2, 2)")
|
||||
// prepare text protocol
|
||||
tk1.MustExec("prepare s from 'select * from t1'")
|
||||
// prepare binary protocol
|
||||
stmtID, _, _, err := tk1.Session().PrepareStmt("select * from t1")
|
||||
require.Nil(t, err)
|
||||
tk1.MustExec("set tx_isolation='READ-COMMITTED'")
|
||||
tk1.MustExec("begin pessimistic")
|
||||
tk2.MustExec("set tx_isolation='READ-COMMITTED'")
|
||||
tk2.MustExec("begin pessimistic")
|
||||
|
||||
// Execute using sql
|
||||
tk1.MustQuery("execute s").Check(testkit.Rows("1 1", "2 2"))
|
||||
// Execute using binary
|
||||
rs, err := tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 1", "2 2"))
|
||||
// Normal sql
|
||||
tk1.MustQuery("select * from t1").Check(testkit.Rows("1 1", "2 2"))
|
||||
|
||||
// Change infoSchema
|
||||
tk2.MustExec("alter table t1 drop column c")
|
||||
tk2.MustExec("insert into t1 values (3)")
|
||||
// Execute using sql
|
||||
tk1.MustQuery("execute s").Check(testkit.Rows("1 1", "2 2", "3 <nil>"))
|
||||
// Execute using binary
|
||||
rs, err = tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 1", "2 2", "3 <nil>"))
|
||||
// Normal sql
|
||||
tk1.MustQuery("select * from t1").Check(testkit.Rows("1 1", "2 2", "3 <nil>"))
|
||||
tk1.MustExec("commit")
|
||||
|
||||
// After beginning a new txn, the infoSchema should be the latest
|
||||
tk1.MustExec("begin pessimistic")
|
||||
tk1.MustQuery("select * from t1").Check(testkit.Rows("1", "2", "3"))
|
||||
|
||||
}
|
||||
|
||||
func verifyCache(ctx context.Context, t *testing.T, tk1 *testkit.TestKit, tk2 *testkit.TestKit, stmtID uint32) {
|
||||
// Cache miss in the firs time.
|
||||
tk1.MustExec("execute s")
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
|
||||
|
||||
// This time, the cache will be hit.
|
||||
_, err := tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
|
||||
tk1.MustExec("execute s")
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
|
||||
|
||||
// Change infoSchema version which will make the plan cache invalid in the next execute
|
||||
tk2.MustExec("alter table t1 drop column c")
|
||||
|
||||
tk1.MustExec("execute s")
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
|
||||
// Now the plan cache will be valid
|
||||
_, err = tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
|
||||
}
|
||||
|
||||
func TestCacheHitInRc(t *testing.T) {
|
||||
orgEnable := core.PreparedPlanCacheEnabled()
|
||||
defer func() {
|
||||
core.SetPreparedPlanCache(orgEnable)
|
||||
}()
|
||||
core.SetPreparedPlanCache(true)
|
||||
|
||||
ctx := context.Background()
|
||||
store, clean := testkit.CreateMockStore(t)
|
||||
defer clean()
|
||||
|
||||
tk1 := testkit.NewTestKit(t, store)
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk1.MustExec("use test")
|
||||
tk2.MustExec("use test")
|
||||
tk1.MustExec("drop table if exists t1")
|
||||
tk1.MustExec("create table t1(id int primary key, c int)")
|
||||
tk1.MustExec("insert into t1 values(1, 1), (2, 2)")
|
||||
// prepare text protocol
|
||||
tk1.MustExec("prepare s from 'select * from t1'")
|
||||
// prepare binary protocol
|
||||
stmtID, _, _, err := tk1.Session().PrepareStmt("select * from t1")
|
||||
require.Nil(t, err)
|
||||
|
||||
// Test for RC
|
||||
tk1.MustExec("set tx_isolation='READ-COMMITTED'")
|
||||
tk1.MustExec("begin pessimistic")
|
||||
|
||||
// Verify for the RC isolation
|
||||
verifyCache(ctx, t, tk1, tk2, stmtID)
|
||||
tk1.MustExec("rollback")
|
||||
}
|
||||
|
||||
func TestCacheHitInForUpdateRead(t *testing.T) {
|
||||
orgEnable := core.PreparedPlanCacheEnabled()
|
||||
defer func() {
|
||||
core.SetPreparedPlanCache(orgEnable)
|
||||
}()
|
||||
core.SetPreparedPlanCache(true)
|
||||
|
||||
ctx := context.Background()
|
||||
store, clean := testkit.CreateMockStore(t)
|
||||
defer clean()
|
||||
|
||||
tk1 := testkit.NewTestKit(t, store)
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk1.MustExec("use test")
|
||||
tk2.MustExec("use test")
|
||||
tk1.MustExec("drop table if exists t1")
|
||||
tk1.MustExec("create table t1(id int primary key, c int)")
|
||||
tk1.MustExec("insert into t1 values(1, 1), (2, 2)")
|
||||
|
||||
tk1.MustExec("prepare s from 'select * from t1 where id = 1 for update'")
|
||||
stmtID, _, _, err := tk1.Session().PrepareStmt("select * from t1 where id = 1 for update")
|
||||
require.Nil(t, err)
|
||||
tk1.MustExec("begin pessimistic")
|
||||
|
||||
// Verify for the for update read
|
||||
verifyCache(ctx, t, tk1, tk2, stmtID)
|
||||
tk1.MustExec("rollback")
|
||||
}
|
||||
|
||||
func TestPointGetForUpdateAutoCommitCache(t *testing.T) {
|
||||
orgEnable := core.PreparedPlanCacheEnabled()
|
||||
defer func() {
|
||||
core.SetPreparedPlanCache(orgEnable)
|
||||
}()
|
||||
core.SetPreparedPlanCache(true)
|
||||
|
||||
ctx := context.Background()
|
||||
store, clean := testkit.CreateMockStore(t)
|
||||
defer clean()
|
||||
|
||||
tk1 := testkit.NewTestKit(t, store)
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk1.MustExec("use test")
|
||||
tk2.MustExec("use test")
|
||||
tk1.MustExec("drop table if exists t1")
|
||||
tk1.MustExec("create table t1(id int primary key, c int)")
|
||||
tk1.MustExec("insert into t1 values(1, 1), (2, 2)")
|
||||
|
||||
tk1.MustExec("prepare s from 'select * from t1 where id = 1 for update'")
|
||||
stmtID, _, _, err := tk1.Session().PrepareStmt("select * from t1 where id = 1 for update")
|
||||
require.Nil(t, err)
|
||||
rs, err := tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 1"))
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
|
||||
|
||||
rs, err = tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 1"))
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
|
||||
|
||||
tk2.MustExec("alter table t1 drop column c")
|
||||
tk2.MustExec("update t1 set id = 10 where id = 1")
|
||||
|
||||
rs, err = tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows())
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
|
||||
|
||||
rs, err = tk1.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
|
||||
require.Nil(t, err)
|
||||
tk1.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows())
|
||||
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
|
||||
}
|
||||
|
||||
@ -75,13 +75,6 @@ func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// WithExecuteInfoSchemaUpdate return a PreprocessOpt to update the `Execute` infoSchema under some conditions.
|
||||
func WithExecuteInfoSchemaUpdate(pe *PreprocessExecuteISUpdate) PreprocessOpt {
|
||||
return func(p *preprocessor) {
|
||||
p.PreprocessExecuteISUpdate = pe
|
||||
}
|
||||
}
|
||||
|
||||
// TryAddExtraLimit trys to add an extra limit for SELECT or UNION statement when sql_select_limit is set.
|
||||
func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode {
|
||||
if ctx.GetSessionVars().SelectLimit == math.MaxUint64 || ctx.GetSessionVars().InRestrictedSQL {
|
||||
@ -172,12 +165,6 @@ type PreprocessorReturn struct {
|
||||
ReadReplicaScope string
|
||||
}
|
||||
|
||||
// PreprocessExecuteISUpdate is used to update information schema for special Execute statement in the preprocessor.
|
||||
type PreprocessExecuteISUpdate struct {
|
||||
ExecuteInfoSchemaUpdate func(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema
|
||||
Node ast.Node
|
||||
}
|
||||
|
||||
// preprocessWith is used to record info from WITH statements like CTE name.
|
||||
type preprocessWith struct {
|
||||
cteCanUsed []string
|
||||
@ -201,7 +188,6 @@ type preprocessor struct {
|
||||
|
||||
// values that may be returned
|
||||
*PreprocessorReturn
|
||||
*PreprocessExecuteISUpdate
|
||||
err error
|
||||
}
|
||||
|
||||
@ -1696,13 +1682,7 @@ func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema {
|
||||
if p.InfoSchema != nil {
|
||||
return p.InfoSchema
|
||||
}
|
||||
// `Execute` under some conditions need to see the latest information schema.
|
||||
if p.PreprocessExecuteISUpdate != nil {
|
||||
if newInfoSchema := p.ExecuteInfoSchemaUpdate(p.Node, p.ctx); newInfoSchema != nil {
|
||||
p.InfoSchema = newInfoSchema
|
||||
return p.InfoSchema
|
||||
}
|
||||
}
|
||||
|
||||
p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
|
||||
return p.InfoSchema
|
||||
}
|
||||
|
||||
@ -41,7 +41,6 @@ import (
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/table/temptable"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/hint"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
@ -63,30 +62,6 @@ func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool {
|
||||
return ast.IsReadOnly(node)
|
||||
}
|
||||
|
||||
// GetExecuteForUpdateReadIS is used to check whether the statement is `execute` and target statement has a forUpdateRead flag.
|
||||
// If so, we will return the latest information schema.
|
||||
func GetExecuteForUpdateReadIS(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema {
|
||||
if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt {
|
||||
vars := sctx.GetSessionVars()
|
||||
execID := execStmt.ExecID
|
||||
if execStmt.Name != "" {
|
||||
execID = vars.PreparedStmtNameToID[execStmt.Name]
|
||||
}
|
||||
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
|
||||
checkSchema := vars.IsIsolation(ast.ReadCommitted)
|
||||
if !checkSchema {
|
||||
preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt)
|
||||
checkSchema = ok && preparedObj.ForUpdateRead
|
||||
}
|
||||
if checkSchema {
|
||||
is := domain.GetDomain(sctx).InfoSchema()
|
||||
return temptable.AttachLocalTemporaryTableInfoSchema(sctx, is)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func matchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode) (bindRecord *bindinfo.BindRecord, scope string, matched bool) {
|
||||
useBinding := sctx.GetSessionVars().UsePlanBaselines
|
||||
if !useBinding || stmtNode == nil {
|
||||
|
||||
@ -165,7 +165,8 @@ func (ts *TiDBStatement) Close() error {
|
||||
if !ok {
|
||||
return errors.Errorf("invalid CachedPrepareStmt type")
|
||||
}
|
||||
cacheKey, err := core.NewPlanCacheKey(ts.ctx.GetSessionVars(), preparedObj.StmtText, preparedObj.StmtDB, preparedObj.PreparedAst.SchemaVersion)
|
||||
cacheKey, err := core.NewPlanCacheKey(ts.ctx.GetSessionVars(), preparedObj.StmtText, preparedObj.StmtDB,
|
||||
preparedObj.PreparedAst.SchemaVersion, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -334,7 +334,7 @@ func (s *session) cleanRetryInfo() {
|
||||
if ok {
|
||||
preparedAst = preparedObj.PreparedAst
|
||||
stmtText, stmtDB = preparedObj.StmtText, preparedObj.StmtDB
|
||||
cacheKey, err = plannercore.NewPlanCacheKey(s.sessionVars, stmtText, stmtDB, preparedAst.SchemaVersion)
|
||||
cacheKey, err = plannercore.NewPlanCacheKey(s.sessionVars, stmtText, stmtDB, preparedAst.SchemaVersion, 0)
|
||||
if err != nil {
|
||||
logutil.Logger(s.currentCtx).Warn("clean cached plan failed", zap.Error(err))
|
||||
return
|
||||
@ -2304,7 +2304,7 @@ func (s *session) cachedPointPlanExec(ctx context.Context,
|
||||
}
|
||||
|
||||
// IsCachedExecOk check if we can execute using plan cached in prepared structure
|
||||
// Be careful for the short path, current precondition is ths cached plan satisfying
|
||||
// Be careful with the short path, current precondition is ths cached plan satisfying
|
||||
// IsPointGetWithPKOrUniqueKeyByAutoCommit
|
||||
func (s *session) IsCachedExecOk(ctx context.Context, preparedStmt *plannercore.CachedPrepareStmt, isStaleness bool) (bool, error) {
|
||||
prepared := preparedStmt.PreparedAst
|
||||
@ -2383,9 +2383,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if s.sessionVars.IsIsolation(ast.ReadCommitted) || preparedStmt.ForUpdateRead {
|
||||
is = domain.GetDomain(s).InfoSchema()
|
||||
is = temptable.AttachLocalTemporaryTableInfoSchema(s, is)
|
||||
} else {
|
||||
is = s.GetInfoSchema().(infoschema.InfoSchema)
|
||||
}
|
||||
|
||||
@ -688,16 +688,15 @@ func TestTxnContextPreparedStmtWithForUpdate(t *testing.T) {
|
||||
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11"))
|
||||
})
|
||||
|
||||
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema())
|
||||
path := append([]string{"assertTxnManagerInPreparedStmtExec"}, normalPathRecords...)
|
||||
doWithCheckPath(t, se, path, func() {
|
||||
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
|
||||
require.NoError(t, err)
|
||||
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 11 100"))
|
||||
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 11"))
|
||||
})
|
||||
|
||||
doWithCheckPath(t, se, normalPathRecords, func() {
|
||||
tk.MustQuery("execute s").Check(testkit.Rows("1 11 100"))
|
||||
tk.MustQuery("execute s").Check(testkit.Rows("1 11"))
|
||||
})
|
||||
|
||||
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
||||
|
||||
Reference in New Issue
Block a user