173 lines
5.4 KiB
Go
173 lines
5.4 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package isolation
|
|
|
|
import (
|
|
"math"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
plannercore "github.com/pingcap/tidb/pkg/planner/core"
|
|
"github.com/pingcap/tidb/pkg/planner/core/base"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
"github.com/pingcap/tidb/pkg/sessiontxn"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var emptyOptimisticTxnContextProvider = OptimisticTxnContextProvider{}
|
|
|
|
// OptimisticTxnContextProvider provides txn context for optimistic transaction
|
|
type OptimisticTxnContextProvider struct {
|
|
baseTxnContextProvider
|
|
optimizeWithMaxTS bool
|
|
}
|
|
|
|
// ResetForNewTxn resets OptimisticTxnContextProvider to an initial state for a new txn
|
|
func (p *OptimisticTxnContextProvider) ResetForNewTxn(sctx sessionctx.Context, causalConsistencyOnly bool) {
|
|
*p = emptyOptimisticTxnContextProvider
|
|
p.sctx = sctx
|
|
p.causalConsistencyOnly = causalConsistencyOnly
|
|
p.onTxnActiveFunc = p.onTxnActive
|
|
p.getStmtReadTSFunc = p.getTxnStartTS
|
|
p.getStmtForUpdateTSFunc = p.getTxnStartTS
|
|
}
|
|
|
|
func (p *OptimisticTxnContextProvider) onTxnActive(
|
|
txn kv.Transaction,
|
|
tp sessiontxn.EnterNewTxnType,
|
|
) {
|
|
sessVars := p.sctx.GetSessionVars()
|
|
sessVars.TxnCtx.CouldRetry = isOptimisticTxnRetryable(sessVars, tp, txn.IsPipelined())
|
|
failpoint.Inject("injectOptimisticTxnRetryable", func(val failpoint.Value) {
|
|
sessVars.TxnCtx.CouldRetry = val.(bool)
|
|
})
|
|
}
|
|
|
|
// isOptimisticTxnRetryable (if returns true) means the transaction could retry.
|
|
// We only consider retry in this optimistic mode.
|
|
// If the session is already in transaction, enable retry or internal SQL could retry.
|
|
// If not, the transaction could always retry, because it should be auto committed transaction.
|
|
// Anyway the retry limit is 0, the transaction could not retry.
|
|
func isOptimisticTxnRetryable(
|
|
sessVars *variable.SessionVars,
|
|
tp sessiontxn.EnterNewTxnType,
|
|
isPipelined bool,
|
|
) bool {
|
|
if tp == sessiontxn.EnterNewTxnDefault {
|
|
return false
|
|
}
|
|
|
|
if isPipelined {
|
|
return false
|
|
}
|
|
|
|
// If retry limit is 0, the transaction could not retry.
|
|
if sessVars.RetryLimit == 0 {
|
|
return false
|
|
}
|
|
|
|
// When `@@tidb_snapshot` is set, it is a ready-only statement and will not cause the errors that should retry a transaction in optimistic mode.
|
|
if sessVars.SnapshotTS != 0 {
|
|
return false
|
|
}
|
|
|
|
// If the session is not InTxn, it is an auto-committed transaction.
|
|
// The auto-committed transaction could always retry.
|
|
if !sessVars.InTxn() {
|
|
return true
|
|
}
|
|
|
|
// The internal transaction could always retry.
|
|
if sessVars.InRestrictedSQL {
|
|
return true
|
|
}
|
|
|
|
// If the retry is enabled, the transaction could retry.
|
|
if !sessVars.DisableTxnAutoRetry {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
|
|
func (p *OptimisticTxnContextProvider) GetStmtReadTS() (uint64, error) {
|
|
// If `math.MaxUint64` is used for point get optimization, it is not necessary to activate the txn.
|
|
// Just return `math.MaxUint64` to save the performance.
|
|
if p.optimizeWithMaxTS {
|
|
return math.MaxUint64, nil
|
|
}
|
|
return p.baseTxnContextProvider.GetStmtReadTS()
|
|
}
|
|
|
|
// GetStmtForUpdateTS returns the read timestamp used by select statement (not for select ... for update)
|
|
func (p *OptimisticTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
|
|
if p.optimizeWithMaxTS {
|
|
return math.MaxUint64, nil
|
|
}
|
|
return p.baseTxnContextProvider.GetStmtForUpdateTS()
|
|
}
|
|
|
|
// AdviseOptimizeWithPlan providers optimization according to the plan
|
|
// It will use MaxTS as the startTS in autocommit txn for some plans.
|
|
func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan any) (err error) {
|
|
if p.optimizeWithMaxTS || p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() {
|
|
return nil
|
|
}
|
|
|
|
if p.txn != nil {
|
|
// `p.txn != nil` means the txn has already been activated, we should not optimize the startTS because the startTS
|
|
// has already been used.
|
|
return nil
|
|
}
|
|
|
|
realPlan, ok := plan.(base.Plan)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if execute, ok := plan.(*plannercore.Execute); ok {
|
|
realPlan = execute.Plan
|
|
}
|
|
|
|
ok = plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(p.sctx.GetSessionVars(), realPlan)
|
|
|
|
if ok {
|
|
sessVars := p.sctx.GetSessionVars()
|
|
logutil.BgLogger().Debug("init txnStartTS with MaxUint64",
|
|
zap.Uint64("conn", sessVars.ConnectionID),
|
|
zap.String("text", sessVars.StmtCtx.OriginalSQL),
|
|
)
|
|
|
|
if err = p.forcePrepareConstStartTS(math.MaxUint64); err != nil {
|
|
logutil.BgLogger().Error("failed init txnStartTS with MaxUint64",
|
|
zap.Error(err),
|
|
zap.Uint64("conn", sessVars.ConnectionID),
|
|
zap.String("text", sessVars.StmtCtx.OriginalSQL),
|
|
)
|
|
return err
|
|
}
|
|
|
|
p.optimizeWithMaxTS = true
|
|
if sessVars.StmtCtx.Priority == mysql.NoPriority {
|
|
sessVars.StmtCtx.Priority = kv.PriorityHigh
|
|
}
|
|
}
|
|
return nil
|
|
}
|