*: not send tso request when point get with max tso (#11981)
This commit is contained in:
@ -769,7 +769,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
|
||||
// IsPointGetWithPKOrUniqueKeyByAutoCommit returns true when meets following conditions:
|
||||
// 1. ctx is auto commit tagged
|
||||
// 2. txn is not valid
|
||||
// 2. plan is point get by pk, or point get by unique index (no double read)
|
||||
// 3. plan is point get by pk, or point get by unique index (no double read)
|
||||
func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannercore.Plan) (bool, error) {
|
||||
// check auto commit
|
||||
if !ctx.GetSessionVars().IsAutocommit() {
|
||||
|
||||
@ -36,9 +36,14 @@ import (
|
||||
func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (plannercore.Plan, error) {
|
||||
fp := plannercore.TryFastPlan(sctx, node)
|
||||
if fp != nil {
|
||||
if !isPointGetWithoutDoubleRead(sctx, fp) {
|
||||
sctx.PrepareTxnFuture(ctx)
|
||||
}
|
||||
return fp, nil
|
||||
}
|
||||
|
||||
sctx.PrepareTxnFuture(ctx)
|
||||
|
||||
var oriHint *bindinfo.HintsSet
|
||||
if stmtNode, ok := node.(ast.StmtNode); ok {
|
||||
oriHint = addHint(sctx, stmtNode)
|
||||
@ -203,6 +208,18 @@ func handleInvalidBindRecord(ctx context.Context, sctx sessionctx.Context, stmtN
|
||||
}
|
||||
}
|
||||
|
||||
// isPointGetWithoutDoubleRead returns true when meets following conditions:
|
||||
// 1. ctx is auto commit tagged.
|
||||
// 2. plan is point get by pk.
|
||||
func isPointGetWithoutDoubleRead(ctx sessionctx.Context, p plannercore.Plan) bool {
|
||||
if !ctx.GetSessionVars().IsAutocommit() {
|
||||
return false
|
||||
}
|
||||
|
||||
v, ok := p.(*plannercore.PointGetPlan)
|
||||
return ok && v.IndexInfo == nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plannercore.OptimizeAstNode = Optimize
|
||||
}
|
||||
|
||||
@ -1146,6 +1146,7 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
|
||||
// NewPrepareExec may need startTS to build the executor, for example prepare statement has subquery in int.
|
||||
// So we have to call PrepareTxnCtx here.
|
||||
s.PrepareTxnCtx(ctx)
|
||||
s.PrepareTxnFuture(ctx)
|
||||
prepareExec := executor.NewPrepareExec(s, executor.GetInfoSchema(s), sql)
|
||||
err = prepareExec.Next(ctx, nil)
|
||||
if err != nil {
|
||||
@ -1785,8 +1786,6 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
txnFuture := s.getTxnFuture(ctx)
|
||||
s.txn.changeInvalidToPending(txnFuture)
|
||||
is := domain.GetDomain(s).InfoSchema()
|
||||
s.sessionVars.TxnCtx = &variable.TransactionContext{
|
||||
InfoSchema: is,
|
||||
@ -1807,6 +1806,16 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// PrepareTxnFuture uses to try to get txn future.
|
||||
func (s *session) PrepareTxnFuture(ctx context.Context) {
|
||||
if s.txn.validOrPending() {
|
||||
return
|
||||
}
|
||||
|
||||
txnFuture := s.getTxnFuture(ctx)
|
||||
s.txn.changeInvalidToPending(txnFuture)
|
||||
}
|
||||
|
||||
// RefreshTxnCtx implements context.RefreshTxnCtx interface.
|
||||
func (s *session) RefreshTxnCtx(ctx context.Context) error {
|
||||
if err := s.doCommit(ctx); err != nil {
|
||||
|
||||
@ -98,6 +98,8 @@ type Context interface {
|
||||
ReleaseAllTableLocks()
|
||||
// HasLockedTables uses to check whether this session locked any tables.
|
||||
HasLockedTables() bool
|
||||
// PrepareTxnFuture uses to prepare txn by future.
|
||||
PrepareTxnFuture(ctx context.Context)
|
||||
}
|
||||
|
||||
type basicCtxType int
|
||||
|
||||
@ -247,6 +247,10 @@ func (c *Context) HasLockedTables() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// PrepareTxnFuture implements the sessionctx.Context interface.
|
||||
func (c *Context) PrepareTxnFuture(ctx context.Context) {
|
||||
}
|
||||
|
||||
// Close implements the sessionctx.Context interface.
|
||||
func (c *Context) Close() {
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user