779 lines
26 KiB
Go
779 lines
26 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package isolation
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
"github.com/pingcap/tidb/pkg/sessiontxn"
|
|
"github.com/pingcap/tidb/pkg/sessiontxn/internal"
|
|
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
|
|
"github.com/pingcap/tidb/pkg/store/driver/txn"
|
|
"github.com/pingcap/tidb/pkg/table/temptable"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/redact"
|
|
"github.com/pingcap/tidb/pkg/util/tableutil"
|
|
"github.com/pingcap/tidb/pkg/util/tracing"
|
|
tikvstore "github.com/tikv/client-go/v2/kv"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
"github.com/tikv/client-go/v2/txnkv/transaction"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation.
|
|
// It provides some common functions below:
|
|
// - Provides a default `OnInitialize` method to initialize its inner state.
|
|
// - Provides some methods like `activateTxn` and `prepareTxn` to manage the inner transaction.
|
|
// - Provides default methods `GetTxnInfoSchema`, `GetStmtReadTS` and `GetStmtForUpdateTS` and return the snapshot information schema or ts when `tidb_snapshot` is set.
|
|
// - Provides other default methods like `Advise`, `OnStmtStart`, `OnStmtRetry` and `OnStmtErrorForNextAction`
|
|
//
|
|
// The subclass can set some inner property of `baseTxnContextProvider` when it is constructed.
|
|
// For example, `getStmtReadTSFunc` and `getStmtForUpdateTSFunc` should be set, and they will be called when `GetStmtReadTS`
|
|
// or `GetStmtForUpdate` to get the timestamp that should be used by the corresponding isolation level.
|
|
type baseTxnContextProvider struct {
|
|
// States that should be initialized when baseTxnContextProvider is created and should not be changed after that
|
|
sctx sessionctx.Context
|
|
causalConsistencyOnly bool
|
|
onInitializeTxnCtx func(*variable.TransactionContext)
|
|
onTxnActiveFunc func(kv.Transaction, sessiontxn.EnterNewTxnType)
|
|
getStmtReadTSFunc func() (uint64, error)
|
|
getStmtForUpdateTSFunc func() (uint64, error)
|
|
|
|
// Runtime states
|
|
ctx context.Context
|
|
infoSchema infoschema.InfoSchema
|
|
txn kv.Transaction
|
|
isTxnPrepared bool
|
|
enterNewTxnType sessiontxn.EnterNewTxnType
|
|
// constStartTS is only used by point get max ts optimization currently.
|
|
// When constStartTS != 0, we use constStartTS directly without fetching it from tso.
|
|
// To save the cpu cycles `PrepareTSFuture` will also not be called when warmup (postpone to activate txn).
|
|
constStartTS uint64
|
|
}
|
|
|
|
// OnInitialize is the hook that should be called when enter a new txn with this provider
|
|
func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn.EnterNewTxnType) (err error) {
|
|
if p.getStmtReadTSFunc == nil || p.getStmtForUpdateTSFunc == nil {
|
|
return errors.New("ts functions should not be nil")
|
|
}
|
|
|
|
p.ctx = ctx
|
|
sessVars := p.sctx.GetSessionVars()
|
|
activeNow := true
|
|
switch tp {
|
|
case sessiontxn.EnterNewTxnDefault:
|
|
// As we will enter a new txn, we need to commit the old txn if it's still valid.
|
|
// There are two main steps here to enter a new txn:
|
|
// 1. prepareTxnWithOracleTS
|
|
// 2. ActivateTxn
|
|
if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil {
|
|
return err
|
|
}
|
|
if err := p.prepareTxnWithOracleTS(); err != nil {
|
|
return err
|
|
}
|
|
case sessiontxn.EnterNewTxnWithBeginStmt:
|
|
if !canReuseTxnWhenExplicitBegin(p.sctx) {
|
|
// As we will enter a new txn, we need to commit the old txn if it's still valid.
|
|
// There are two main steps here to enter a new txn:
|
|
// 1. prepareTxnWithOracleTS
|
|
// 2. ActivateTxn
|
|
if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil {
|
|
return err
|
|
}
|
|
if err := p.prepareTxnWithOracleTS(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
sessVars.SetInTxn(true)
|
|
case sessiontxn.EnterNewTxnBeforeStmt:
|
|
activeNow = false
|
|
default:
|
|
return errors.Errorf("Unsupported type: %v", tp)
|
|
}
|
|
|
|
p.enterNewTxnType = tp
|
|
p.infoSchema = p.sctx.GetLatestInfoSchema().(infoschema.InfoSchema)
|
|
txnCtx := &variable.TransactionContext{
|
|
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
|
|
CreateTime: time.Now(),
|
|
InfoSchema: p.infoSchema,
|
|
TxnScope: sessVars.CheckAndGetTxnScope(),
|
|
},
|
|
}
|
|
if p.onInitializeTxnCtx != nil {
|
|
p.onInitializeTxnCtx(txnCtx)
|
|
}
|
|
sessVars.TxnCtxMu.Lock()
|
|
sessVars.TxnCtx = txnCtx
|
|
sessVars.TxnCtxMu.Unlock()
|
|
if vardef.IsMDLEnabled() {
|
|
sessVars.TxnCtx.EnableMDL = true
|
|
}
|
|
|
|
txn, err := p.sctx.Txn(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture() != nil
|
|
if activeNow {
|
|
_, err = p.ActivateTxn()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// GetTxnInfoSchema returns the information schema used by txn
|
|
func (p *baseTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema {
|
|
if is := p.sctx.GetSessionVars().SnapshotInfoschema; is != nil {
|
|
return is.(infoschema.InfoSchema)
|
|
}
|
|
if _, ok := p.infoSchema.(*infoschema.SessionExtendedInfoSchema); !ok {
|
|
p.infoSchema = &infoschema.SessionExtendedInfoSchema{
|
|
InfoSchema: p.infoSchema,
|
|
}
|
|
p.sctx.GetSessionVars().TxnCtx.InfoSchema = p.infoSchema
|
|
}
|
|
return p.infoSchema
|
|
}
|
|
|
|
// GetTxnScope returns the current txn scope
|
|
func (p *baseTxnContextProvider) GetTxnScope() string {
|
|
return p.sctx.GetSessionVars().TxnCtx.TxnScope
|
|
}
|
|
|
|
// GetReadReplicaScope returns the read replica scope
|
|
func (p *baseTxnContextProvider) GetReadReplicaScope() string {
|
|
if txnScope := p.GetTxnScope(); txnScope != kv.GlobalTxnScope && txnScope != "" {
|
|
// In local txn, we should use txnScope as the readReplicaScope
|
|
return txnScope
|
|
}
|
|
|
|
if p.sctx.GetSessionVars().GetReplicaRead().IsClosestRead() {
|
|
// If closest read is set, we should use the scope where instance located.
|
|
return config.GetTxnScopeFromConfig()
|
|
}
|
|
|
|
// When it is not local txn or closet read, we should use global scope
|
|
return kv.GlobalReplicaScope
|
|
}
|
|
|
|
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
|
|
func (p *baseTxnContextProvider) GetStmtReadTS() (uint64, error) {
|
|
if _, err := p.ActivateTxn(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
|
|
return snapshotTS, nil
|
|
}
|
|
return p.getStmtReadTSFunc()
|
|
}
|
|
|
|
// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
|
|
func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
|
|
if _, err := p.ActivateTxn(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
|
|
return snapshotTS, nil
|
|
}
|
|
return p.getStmtForUpdateTSFunc()
|
|
}
|
|
|
|
// OnStmtStart is the hook that should be called when a new statement started
|
|
func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context, _ ast.StmtNode) error {
|
|
p.ctx = ctx
|
|
return nil
|
|
}
|
|
|
|
// OnPessimisticStmtStart is the hook that should be called when starts handling a pessimistic DML or
|
|
// a pessimistic select-for-update statements.
|
|
func (p *baseTxnContextProvider) OnPessimisticStmtStart(_ context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
// OnPessimisticStmtEnd is the hook that should be called when finishes handling a pessimistic DML or
|
|
// select-for-update statement.
|
|
func (p *baseTxnContextProvider) OnPessimisticStmtEnd(_ context.Context, _ bool) error {
|
|
return nil
|
|
}
|
|
|
|
// OnStmtRetry is the hook that should be called when a statement is retried internally.
|
|
func (p *baseTxnContextProvider) OnStmtRetry(ctx context.Context) error {
|
|
p.ctx = ctx
|
|
p.sctx.GetSessionVars().TxnCtx.CurrentStmtPessimisticLockCache = nil
|
|
return nil
|
|
}
|
|
|
|
// OnStmtCommit is the hook that should be called when a statement is executed successfully.
|
|
func (p *baseTxnContextProvider) OnStmtCommit(_ context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
// OnStmtRollback is the hook that should be called when a statement fails to execute.
|
|
func (p *baseTxnContextProvider) OnStmtRollback(_ context.Context, _ bool) error {
|
|
return nil
|
|
}
|
|
|
|
// OnLocalTemporaryTableCreated is the hook that should be called when a local temporary table created.
|
|
func (p *baseTxnContextProvider) OnLocalTemporaryTableCreated() {
|
|
p.infoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, p.infoSchema)
|
|
p.sctx.GetSessionVars().TxnCtx.InfoSchema = p.infoSchema
|
|
if p.txn != nil && p.txn.Valid() {
|
|
if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
|
|
p.txn.SetOption(kv.SnapInterceptor, interceptor)
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
|
|
func (p *baseTxnContextProvider) OnStmtErrorForNextAction(ctx context.Context, point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
|
|
switch point {
|
|
case sessiontxn.StmtErrAfterPessimisticLock:
|
|
// for pessimistic lock error, return the error by default
|
|
return sessiontxn.ErrorAction(err)
|
|
default:
|
|
return sessiontxn.NoIdea()
|
|
}
|
|
}
|
|
|
|
func (p *baseTxnContextProvider) getTxnStartTS() (uint64, error) {
|
|
txn, err := p.ActivateTxn()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return txn.StartTS(), nil
|
|
}
|
|
|
|
// TODO: replace usePresetStartTS with a new method StartTSFromPD to make it clear that
|
|
// the timestamp is not allocated by TSO.
|
|
func (p *baseTxnContextProvider) usePresetStartTS() bool {
|
|
return p.constStartTS != 0 || p.sctx.GetSessionVars().SnapshotTS != 0
|
|
}
|
|
|
|
// ActivateTxn activates the transaction and set the relevant context variables.
|
|
func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
|
|
if p.txn != nil {
|
|
return p.txn, nil
|
|
}
|
|
|
|
if err := p.prepareTxn(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if p.constStartTS != 0 {
|
|
if err := p.replaceTxnTsFuture(sessiontxn.ConstantFuture(p.constStartTS)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
txnFuture := p.sctx.GetPreparedTxnFuture()
|
|
|
|
// Inject delay before TSO wait for testing maxExecutionTime
|
|
failpoint.Inject("injectTSOWaitDelay", func(val failpoint.Value) {
|
|
if delayMs, ok := val.(int); ok {
|
|
time.Sleep(time.Duration(delayMs) * time.Millisecond)
|
|
}
|
|
})
|
|
|
|
txn, err := txnFuture.Wait(p.ctx, p.sctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sessVars := p.sctx.GetSessionVars()
|
|
sessVars.TxnCtxMu.Lock()
|
|
sessVars.TxnCtx.StartTS = txn.StartTS()
|
|
sessVars.GetRowIDShardGenerator().SetShardStep(int(sessVars.ShardAllocateStep))
|
|
sessVars.TxnCtxMu.Unlock()
|
|
if sessVars.MemDBFootprint != nil {
|
|
sessVars.MemDBFootprint.Detach()
|
|
}
|
|
sessVars.MemDBFootprint = nil
|
|
|
|
if p.enterNewTxnType == sessiontxn.EnterNewTxnBeforeStmt && !sessVars.IsAutocommit() && sessVars.SnapshotTS == 0 {
|
|
sessVars.SetInTxn(true)
|
|
}
|
|
|
|
// verify start_ts is later than any previous commit_ts in the session
|
|
if !p.usePresetStartTS() && sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS {
|
|
logutil.BgLogger().Error("check session lastCommitTS failed",
|
|
zap.Uint64("lastCommitTS", sessVars.LastCommitTS),
|
|
zap.Uint64("startTS", sessVars.TxnCtx.StartTS),
|
|
zap.String("sql", redact.String(sessVars.EnableRedactLog, sessVars.StmtCtx.OriginalSQL)),
|
|
)
|
|
return nil, fmt.Errorf("txn start_ts:%d is before session last_commit_ts:%d",
|
|
sessVars.TxnCtx.StartTS, sessVars.LastCommitTS)
|
|
}
|
|
|
|
txn.SetVars(sessVars.KVVars)
|
|
|
|
p.SetOptionsOnTxnActive(txn)
|
|
|
|
if p.onTxnActiveFunc != nil {
|
|
p.onTxnActiveFunc(txn, p.enterNewTxnType)
|
|
}
|
|
|
|
p.txn = txn
|
|
return txn, nil
|
|
}
|
|
|
|
// prepareTxn prepares txn with an oracle ts future. If the snapshotTS is set,
|
|
// the txn is prepared with it.
|
|
func (p *baseTxnContextProvider) prepareTxn() error {
|
|
if p.isTxnPrepared {
|
|
return nil
|
|
}
|
|
|
|
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
|
|
return p.replaceTxnTsFuture(sessiontxn.ConstantFuture(snapshotTS))
|
|
}
|
|
|
|
future := newOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope)
|
|
return p.replaceTxnTsFuture(future)
|
|
}
|
|
|
|
// prepareTxnWithOracleTS
|
|
// The difference between prepareTxnWithOracleTS and prepareTxn is that prepareTxnWithOracleTS
|
|
// does not consider snapshotTS
|
|
func (p *baseTxnContextProvider) prepareTxnWithOracleTS() error {
|
|
if p.isTxnPrepared {
|
|
return nil
|
|
}
|
|
|
|
future := newOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope)
|
|
return p.replaceTxnTsFuture(future)
|
|
}
|
|
|
|
func (p *baseTxnContextProvider) forcePrepareConstStartTS(ts uint64) error {
|
|
if p.txn != nil {
|
|
return errors.New("cannot force prepare const start ts because txn is active")
|
|
}
|
|
p.constStartTS = ts
|
|
p.isTxnPrepared = true
|
|
return nil
|
|
}
|
|
|
|
func (p *baseTxnContextProvider) replaceTxnTsFuture(future oracle.Future) error {
|
|
txn, err := p.sctx.Txn(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if txn.Valid() {
|
|
return nil
|
|
}
|
|
|
|
txnScope := p.sctx.GetSessionVars().TxnCtx.TxnScope
|
|
if err = p.sctx.PrepareTSFuture(p.ctx, future, txnScope); err != nil {
|
|
return err
|
|
}
|
|
|
|
p.isTxnPrepared = true
|
|
return nil
|
|
}
|
|
|
|
func (p *baseTxnContextProvider) isTidbSnapshotEnabled() bool {
|
|
return p.sctx.GetSessionVars().SnapshotTS != 0
|
|
}
|
|
|
|
// isBeginStmtWithStaleRead indicates whether the current statement is `BeginStmt` type with stale read
|
|
// Because stale read will use `staleread.StalenessTxnContextProvider` for query, so if `staleread.IsStmtStaleness()`
|
|
// returns true in other providers, it means the current statement is `BeginStmt` with stale read
|
|
func (p *baseTxnContextProvider) isBeginStmtWithStaleRead() bool {
|
|
return staleread.IsStmtStaleness(p.sctx)
|
|
}
|
|
|
|
// AdviseWarmup provides warmup for inner state
|
|
func (p *baseTxnContextProvider) AdviseWarmup() error {
|
|
if p.isTxnPrepared || p.isBeginStmtWithStaleRead() {
|
|
// When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp
|
|
return nil
|
|
}
|
|
return p.prepareTxn()
|
|
}
|
|
|
|
// AdviseOptimizeWithPlan providers optimization according to the plan
|
|
func (p *baseTxnContextProvider) AdviseOptimizeWithPlan(_ any) error {
|
|
return nil
|
|
}
|
|
|
|
// GetSnapshotWithStmtReadTS gets snapshot with read ts
|
|
func (p *baseTxnContextProvider) GetSnapshotWithStmtReadTS() (kv.Snapshot, error) {
|
|
ts, err := p.GetStmtReadTS()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return p.getSnapshotByTS(ts)
|
|
}
|
|
|
|
// GetSnapshotWithStmtForUpdateTS gets snapshot with for update ts
|
|
func (p *baseTxnContextProvider) GetSnapshotWithStmtForUpdateTS() (kv.Snapshot, error) {
|
|
ts, err := p.GetStmtForUpdateTS()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return p.getSnapshotByTS(ts)
|
|
}
|
|
|
|
// getSnapshotByTS get snapshot from store according to the snapshotTS and set the transaction related
|
|
// options before return
|
|
func (p *baseTxnContextProvider) getSnapshotByTS(snapshotTS uint64) (kv.Snapshot, error) {
|
|
txn, err := p.sctx.Txn(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
txnCtx := p.sctx.GetSessionVars().TxnCtx
|
|
|
|
var snapshot kv.Snapshot
|
|
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == snapshotTS {
|
|
snapshot = txn.GetSnapshot()
|
|
} else {
|
|
snapshot = internal.GetSnapshotWithTS(
|
|
p.sctx,
|
|
snapshotTS,
|
|
temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema),
|
|
)
|
|
}
|
|
snapshot.SetOption(kv.ReplicaRead, p.sctx.GetSessionVars().GetReplicaRead())
|
|
|
|
return snapshot, nil
|
|
}
|
|
|
|
func (p *baseTxnContextProvider) SetOptionsOnTxnActive(txn kv.Transaction) {
|
|
sessVars := p.sctx.GetSessionVars()
|
|
|
|
readReplicaType := sessVars.GetReplicaRead()
|
|
if readReplicaType.IsFollowerRead() {
|
|
txn.SetOption(kv.ReplicaRead, readReplicaType)
|
|
}
|
|
|
|
if interceptor := temptable.SessionSnapshotInterceptor(
|
|
p.sctx,
|
|
p.infoSchema,
|
|
); interceptor != nil {
|
|
txn.SetOption(kv.SnapInterceptor, interceptor)
|
|
}
|
|
|
|
if sessVars.StmtCtx.WeakConsistency {
|
|
txn.SetOption(kv.IsolationLevel, kv.RC)
|
|
}
|
|
|
|
internal.SetTxnAssertionLevel(txn, sessVars.AssertionLevel)
|
|
|
|
if p.sctx.GetSessionVars().InRestrictedSQL {
|
|
txn.SetOption(kv.RequestSourceInternal, true)
|
|
}
|
|
|
|
if txn.IsPipelined() {
|
|
txn.SetOption(kv.RequestSourceType, "p-dml")
|
|
} else if tp := p.sctx.GetSessionVars().RequestSourceType; tp != "" {
|
|
txn.SetOption(kv.RequestSourceType, tp)
|
|
}
|
|
|
|
if sessVars.LoadBasedReplicaReadThreshold > 0 {
|
|
txn.SetOption(kv.LoadBasedReplicaReadThreshold, sessVars.LoadBasedReplicaReadThreshold)
|
|
}
|
|
|
|
txn.SetOption(kv.CommitHook, func(info string, _ error) { sessVars.LastTxnInfo = info })
|
|
txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
|
|
txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
|
|
if sessVars.DiskFullOpt != kvrpcpb.DiskFullOpt_NotAllowedOnFull {
|
|
txn.SetDiskFullOpt(sessVars.DiskFullOpt)
|
|
}
|
|
txn.SetOption(kv.InfoSchema, sessVars.TxnCtx.InfoSchema)
|
|
if sessVars.StmtCtx.KvExecCounter != nil {
|
|
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
|
|
txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
|
|
}
|
|
txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
|
|
txn.SetOption(kv.ExplicitRequestSourceType, sessVars.ExplicitRequestSourceType)
|
|
|
|
if p.causalConsistencyOnly || !sessVars.GuaranteeLinearizability {
|
|
// priority of the sysvar is lower than `start transaction with causal consistency only`
|
|
txn.SetOption(kv.GuaranteeLinearizability, false)
|
|
} else {
|
|
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
|
|
// because the property is naturally holds:
|
|
// We guarantee the commitTS of any transaction must not exceed the next timestamp from the TSO.
|
|
// An auto-commit transaction fetches its startTS from the TSO so its commitTS > its startTS > the commitTS
|
|
// of any previously committed transactions.
|
|
// Additionally, it's required to guarantee linearizability for snapshot read-only transactions though
|
|
// it does take effects on read-only transactions now.
|
|
txn.SetOption(
|
|
kv.GuaranteeLinearizability,
|
|
!sessVars.IsAutocommit() ||
|
|
sessVars.SnapshotTS > 0 ||
|
|
p.enterNewTxnType == sessiontxn.EnterNewTxnDefault ||
|
|
p.enterNewTxnType == sessiontxn.EnterNewTxnWithBeginStmt,
|
|
)
|
|
}
|
|
|
|
txn.SetOption(kv.SessionID, p.sctx.GetSessionVars().ConnectionID)
|
|
|
|
// backgroundGoroutineWaitGroup is pre-initialized before the closure to avoid accessing `p.sctx` in the closure,
|
|
// which may cause unexpected race condition.
|
|
backgroundGoroutineWaitGroup := p.sctx.GetCommitWaitGroup()
|
|
lifecycleHooks := transaction.LifecycleHooks{
|
|
Pre: func() {
|
|
backgroundGoroutineWaitGroup.Add(1)
|
|
},
|
|
Post: func() {
|
|
backgroundGoroutineWaitGroup.Done()
|
|
},
|
|
}
|
|
txn.SetOption(kv.BackgroundGoroutineLifecycleHooks, lifecycleHooks)
|
|
}
|
|
|
|
func (p *baseTxnContextProvider) SetOptionsBeforeCommit(
|
|
txn kv.Transaction, commitTSChecker func(uint64) bool,
|
|
) error {
|
|
sessVars := p.sctx.GetSessionVars()
|
|
// Pipelined dml txn already flushed mutations into stores, so we don't need to set options for them.
|
|
// Instead, some invariants must be checked to avoid anomalies though are unreachable in designed usages.
|
|
if p.txn.IsPipelined() {
|
|
if p.txn.IsPipelined() && !sessVars.TxnCtx.EnableMDL {
|
|
return errors.New("cannot commit pipelined transaction without Metadata Lock: MDL is OFF")
|
|
}
|
|
if len(sessVars.TxnCtx.TemporaryTables) > 0 {
|
|
return errors.New("pipelined dml with temporary tables is not allowed")
|
|
}
|
|
if sessVars.CDCWriteSource != 0 {
|
|
return errors.New("pipelined dml with CDC source is not allowed")
|
|
}
|
|
if commitTSChecker != nil {
|
|
return errors.New("pipelined dml with commitTS checker is not allowed")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// set resource tagger again for internal tasks separated in different transactions
|
|
txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
|
|
|
|
// Get the related table or partition IDs.
|
|
relatedPhysicalTables := sessVars.TxnCtx.TableDeltaMap
|
|
// Get accessed temporary tables in the transaction.
|
|
temporaryTables := sessVars.TxnCtx.TemporaryTables
|
|
physicalTableIDs := make([]int64, 0, len(relatedPhysicalTables))
|
|
for id := range relatedPhysicalTables {
|
|
// Schema change on global temporary tables doesn't affect transactions.
|
|
if _, ok := temporaryTables[id]; ok {
|
|
continue
|
|
}
|
|
physicalTableIDs = append(physicalTableIDs, id)
|
|
}
|
|
needCheckSchemaByDelta := true
|
|
// Set this option for 2 phase commit to validate schema lease.
|
|
if sessVars.TxnCtx != nil {
|
|
needCheckSchemaByDelta = !sessVars.TxnCtx.EnableMDL
|
|
}
|
|
|
|
// TODO: refactor SetOption usage to avoid race risk, should detect it in test.
|
|
// The pipelined txn will may be flushed in background, not touch the options to avoid races.
|
|
// to avoid session set overlap the txn set.
|
|
txn.SetOption(
|
|
kv.SchemaChecker,
|
|
domain.NewSchemaChecker(
|
|
p.sctx.GetSchemaValidator(),
|
|
p.GetTxnInfoSchema().SchemaMetaVersion(),
|
|
physicalTableIDs,
|
|
needCheckSchemaByDelta,
|
|
),
|
|
)
|
|
|
|
if sessVars.StmtCtx.KvExecCounter != nil {
|
|
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
|
|
txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
|
|
}
|
|
|
|
if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 {
|
|
txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables))
|
|
}
|
|
|
|
var txnSource uint64
|
|
if val := txn.GetOption(kv.TxnSource); val != nil {
|
|
txnSource, _ = val.(uint64)
|
|
}
|
|
// If the transaction is started by CDC, we need to set the CDCWriteSource option.
|
|
if sessVars.CDCWriteSource != 0 {
|
|
err := kv.SetCDCWriteSource(&txnSource, sessVars.CDCWriteSource)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
txn.SetOption(kv.TxnSource, txnSource)
|
|
}
|
|
|
|
if commitTSChecker != nil {
|
|
txn.SetOption(kv.CommitTSUpperBoundCheck, commitTSChecker)
|
|
}
|
|
|
|
// Optimization:
|
|
// If an auto-commit optimistic transaction can retry in pessimistic mode,
|
|
// do not resolve locks when prewrite.
|
|
// 1. safety: The locks can be resolved later when it retries in pessimistic mode.
|
|
// 2. benefit: In high-contention scenarios, pessimistic transactions perform better.
|
|
prewriteEncounterLockPolicy := transaction.TryResolvePolicy
|
|
if sessVars.TxnCtx.CouldRetry &&
|
|
sessVars.IsAutocommit() &&
|
|
!sessVars.InTxn() &&
|
|
!sessVars.TxnCtx.IsPessimistic {
|
|
prewriteEncounterLockPolicy = transaction.NoResolvePolicy
|
|
}
|
|
txn.SetOption(kv.PrewriteEncounterLockPolicy, prewriteEncounterLockPolicy)
|
|
|
|
return nil
|
|
}
|
|
|
|
// canReuseTxnWhenExplicitBegin returns whether we should reuse the txn when starting a transaction explicitly
|
|
func canReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool {
|
|
sessVars := sctx.GetSessionVars()
|
|
txnCtx := sessVars.TxnCtx
|
|
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
|
|
// need to call NewTxn, which commits the existing transaction and begins a new one.
|
|
// If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should
|
|
// always create a new transaction.
|
|
// If the variable `tidb_snapshot` is set, we should always create a new transaction because the current txn may be
|
|
// initialized with snapshot ts.
|
|
return txnCtx.History == nil && !txnCtx.IsStaleness && sessVars.SnapshotTS == 0
|
|
}
|
|
|
|
// newOracleFuture creates new future according to the scope and the session context
|
|
func newOracleFuture(ctx context.Context, sctx sessionctx.Context, scope string) oracle.Future {
|
|
r, ctx := tracing.StartRegionEx(ctx, "isolation.newOracleFuture")
|
|
defer r.End()
|
|
|
|
failpoint.Inject("requestTsoFromPD", func() {
|
|
sessiontxn.TsoRequestCountInc(sctx)
|
|
})
|
|
|
|
oracleStore := sctx.GetStore().GetOracle()
|
|
option := &oracle.Option{TxnScope: scope}
|
|
|
|
if sctx.GetSessionVars().UseLowResolutionTSO() {
|
|
return oracleStore.GetLowResolutionTimestampAsync(ctx, option)
|
|
}
|
|
return oracleStore.GetTimestampAsync(ctx, option)
|
|
}
|
|
|
|
// funcFuture implements oracle.Future
|
|
type funcFuture func() (uint64, error)
|
|
|
|
// Wait returns a ts got from the func
|
|
func (f funcFuture) Wait() (uint64, error) {
|
|
return f()
|
|
}
|
|
|
|
// basePessimisticTxnContextProvider extends baseTxnContextProvider with some functionalities that are commonly used in
|
|
// pessimistic transactions.
|
|
type basePessimisticTxnContextProvider struct {
|
|
baseTxnContextProvider
|
|
}
|
|
|
|
// OnPessimisticStmtStart is the hook that should be called when starts handling a pessimistic DML or
|
|
// a pessimistic select-for-update statements.
|
|
func (p *basePessimisticTxnContextProvider) OnPessimisticStmtStart(ctx context.Context) error {
|
|
if err := p.baseTxnContextProvider.OnPessimisticStmtStart(ctx); err != nil {
|
|
return err
|
|
}
|
|
if p.sctx.GetSessionVars().PessimisticTransactionFairLocking &&
|
|
p.txn != nil &&
|
|
p.sctx.GetSessionVars().ConnectionID != 0 &&
|
|
!p.sctx.GetSessionVars().InRestrictedSQL {
|
|
if err := p.txn.StartFairLocking(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// OnPessimisticStmtEnd is the hook that should be called when finishes handling a pessimistic DML or
|
|
// select-for-update statement.
|
|
func (p *basePessimisticTxnContextProvider) OnPessimisticStmtEnd(ctx context.Context, isSuccessful bool) error {
|
|
if err := p.baseTxnContextProvider.OnPessimisticStmtEnd(ctx, isSuccessful); err != nil {
|
|
return err
|
|
}
|
|
if p.txn != nil && p.txn.IsInFairLockingMode() {
|
|
if isSuccessful {
|
|
if err := p.txn.DoneFairLocking(ctx); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := p.txn.CancelFairLocking(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if isSuccessful {
|
|
p.sctx.GetSessionVars().TxnCtx.FlushStmtPessimisticLockCache()
|
|
} else {
|
|
p.sctx.GetSessionVars().TxnCtx.CurrentStmtPessimisticLockCache = nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *basePessimisticTxnContextProvider) retryFairLockingIfNeeded(ctx context.Context) error {
|
|
if p.txn != nil && p.txn.IsInFairLockingMode() {
|
|
if err := p.txn.RetryFairLocking(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *basePessimisticTxnContextProvider) cancelFairLockingIfNeeded(ctx context.Context) error {
|
|
if p.txn != nil && p.txn.IsInFairLockingMode() {
|
|
if err := p.txn.CancelFairLocking(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type temporaryTableKVFilter map[int64]tableutil.TempTable
|
|
|
|
func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(
|
|
key, value []byte, flags tikvstore.KeyFlags,
|
|
) (bool, error) {
|
|
tid := tablecodec.DecodeTableID(key)
|
|
if _, ok := m[tid]; ok {
|
|
return true, nil
|
|
}
|
|
|
|
// This is the default filter for all tables.
|
|
defaultFilter := txn.TiDBKVFilter{}
|
|
return defaultFilter.IsUnnecessaryKeyValue(key, value, flags)
|
|
}
|