txn: Use TransactionOption in store/tikv module (#23255)
This commit is contained in:
7
kv/kv.go
7
kv/kv.go
@ -341,6 +341,8 @@ type Driver interface {
|
||||
}
|
||||
|
||||
// TransactionOption indicates the option when beginning a transaction
|
||||
// `TxnScope` must be set for each object
|
||||
// Every other fields are optional, but currently at most one of them can be set
|
||||
type TransactionOption struct {
|
||||
TxnScope string
|
||||
StartTS *uint64
|
||||
@ -349,6 +351,11 @@ type TransactionOption struct {
|
||||
MaxPrevSec *uint64
|
||||
}
|
||||
|
||||
// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used
|
||||
func DefaultTransactionOption() TransactionOption {
|
||||
return TransactionOption{TxnScope: oracle.GlobalTxnScope}
|
||||
}
|
||||
|
||||
// SetMaxPrevSec set maxPrevSec
|
||||
func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption {
|
||||
to.MaxPrevSec = &maxPrevSec
|
||||
|
||||
@ -1931,7 +1931,7 @@ func (s *session) NewTxn(ctx context.Context) error {
|
||||
zap.String("txnScope", txnScope))
|
||||
}
|
||||
|
||||
txn, err := s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
|
||||
txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2728,7 +2728,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
|
||||
}
|
||||
|
||||
// no need to get txn from txnFutureCh since txn should init with startTs
|
||||
txn, err := s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS))
|
||||
txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2761,22 +2761,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc
|
||||
txnScope := s.GetSessionVars().CheckAndGetTxnScope()
|
||||
switch option.Mode {
|
||||
case ast.TimestampBoundReadTimestamp:
|
||||
txn, err = s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(txnScope).SetStartTs(option.StartTS))
|
||||
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case ast.TimestampBoundExactStaleness:
|
||||
txn, err = s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(txnScope).SetPrevSec(option.PrevSec))
|
||||
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case ast.TimestampBoundMaxStaleness:
|
||||
txn, err = s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec))
|
||||
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case ast.TimestampBoundMinReadTimestamp:
|
||||
txn, err = s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(txnScope).SetMinStartTS(option.StartTS))
|
||||
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -353,14 +353,14 @@ type txnFuture struct {
|
||||
func (tf *txnFuture) wait() (kv.Transaction, error) {
|
||||
startTS, err := tf.future.Wait()
|
||||
if err == nil {
|
||||
return tf.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(tf.txnScope).SetStartTs(startTS))
|
||||
return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS))
|
||||
} else if config.GetGlobalConfig().Store == "unistore" {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logutil.BgLogger().Warn("wait tso failed", zap.Error(err))
|
||||
// It would retry get timestamp.
|
||||
return tf.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(tf.txnScope))
|
||||
return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope))
|
||||
}
|
||||
|
||||
func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
|
||||
|
||||
@ -30,7 +30,6 @@ import (
|
||||
"github.com/pingcap/tidb/store/gcworker"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/config"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
pd "github.com/tikv/pd/client"
|
||||
@ -306,27 +305,10 @@ func (s *tikvStore) Begin() (kv.Transaction, error) {
|
||||
|
||||
// BeginWithOption begins a transaction with given option
|
||||
func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) {
|
||||
txnScope := option.TxnScope
|
||||
if txnScope == "" {
|
||||
txnScope = oracle.GlobalTxnScope
|
||||
}
|
||||
var txn *tikv.KVTxn
|
||||
var err error
|
||||
if option.StartTS != nil {
|
||||
txn, err = s.BeginWithStartTS(txnScope, *option.StartTS)
|
||||
} else if option.PrevSec != nil {
|
||||
txn, err = s.BeginWithExactStaleness(txnScope, *option.PrevSec)
|
||||
} else if option.MaxPrevSec != nil {
|
||||
txn, err = s.BeginWithMaxPrevSec(txnScope, *option.MaxPrevSec)
|
||||
} else if option.MinStartTS != nil {
|
||||
txn, err = s.BeginWithMinStartTS(txnScope, *option.MinStartTS)
|
||||
} else {
|
||||
txn, err = s.BeginWithTxnScope(txnScope)
|
||||
}
|
||||
txn, err := s.KVStore.BeginWithOption(option)
|
||||
if err != nil {
|
||||
return nil, txn_driver.ToTiDBErr(err)
|
||||
}
|
||||
|
||||
return txn_driver.NewTiKVTxn(txn), err
|
||||
}
|
||||
|
||||
|
||||
@ -22,7 +22,6 @@ import (
|
||||
driver "github.com/pingcap/tidb/store/driver/txn"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/config"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
)
|
||||
|
||||
// Wraps tikv.KVStore and make it compatible with kv.Storage.
|
||||
@ -85,20 +84,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{},
|
||||
|
||||
// BeginWithOption begins a transaction with given option
|
||||
func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) {
|
||||
txnScope := option.TxnScope
|
||||
if txnScope == "" {
|
||||
txnScope = oracle.GlobalTxnScope
|
||||
}
|
||||
if option.StartTS != nil {
|
||||
return newTiKVTxn(s.BeginWithStartTS(txnScope, *option.StartTS))
|
||||
} else if option.PrevSec != nil {
|
||||
return newTiKVTxn(s.BeginWithExactStaleness(txnScope, *option.PrevSec))
|
||||
} else if option.MaxPrevSec != nil {
|
||||
return newTiKVTxn(s.BeginWithMaxPrevSec(txnScope, *option.MaxPrevSec))
|
||||
} else if option.MinStartTS != nil {
|
||||
return newTiKVTxn(s.BeginWithMinStartTS(txnScope, *option.MinStartTS))
|
||||
}
|
||||
return newTiKVTxn(s.BeginWithTxnScope(txnScope))
|
||||
return newTiKVTxn(s.KVStore.BeginWithOption(option))
|
||||
}
|
||||
|
||||
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
|
||||
|
||||
122
store/tikv/extract_start_ts_test.go
Normal file
122
store/tikv/extract_start_ts_test.go
Normal file
@ -0,0 +1,122 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/mockstore/unistore"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
)
|
||||
|
||||
type extractStartTsSuite struct {
|
||||
store *KVStore
|
||||
}
|
||||
|
||||
var _ = Suite(&extractStartTsSuite{})
|
||||
|
||||
func (s *extractStartTsSuite) SetUpTest(c *C) {
|
||||
client, pdClient, cluster, err := unistore.New("")
|
||||
c.Assert(err, IsNil)
|
||||
unistore.BootstrapWithSingleStore(cluster)
|
||||
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
|
||||
c.Assert(err, IsNil)
|
||||
store.regionCache.storeMu.stores[2] = &Store{
|
||||
storeID: 2,
|
||||
storeType: tikvrpc.TiKV,
|
||||
state: uint64(resolved),
|
||||
labels: []*metapb.StoreLabel{
|
||||
{
|
||||
Key: DCLabelKey,
|
||||
Value: oracle.LocalTxnScope,
|
||||
},
|
||||
},
|
||||
}
|
||||
store.regionCache.storeMu.stores[3] = &Store{
|
||||
storeID: 3,
|
||||
storeType: tikvrpc.TiKV,
|
||||
state: uint64(resolved),
|
||||
labels: []*metapb.StoreLabel{{
|
||||
Key: DCLabelKey,
|
||||
Value: "Some Random Label",
|
||||
}},
|
||||
}
|
||||
store.resolveTSMu.resolveTS[2] = 102
|
||||
store.resolveTSMu.resolveTS[3] = 101
|
||||
s.store = store
|
||||
}
|
||||
|
||||
func (s *extractStartTsSuite) TestExtractStartTs(c *C) {
|
||||
i := uint64(100)
|
||||
cases := []kv.TransactionOption{
|
||||
// StartTS setted
|
||||
{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil},
|
||||
// PrevSec setted
|
||||
{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil},
|
||||
// MinStartTS setted, global
|
||||
{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil},
|
||||
// MinStartTS setted, local
|
||||
{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil},
|
||||
// MaxPrevSec setted
|
||||
// however we need to add more cases to check the behavior when it fall backs to MinStartTS setted
|
||||
// see `TestMaxPrevSecFallback`
|
||||
{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i},
|
||||
// nothing setted
|
||||
{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil},
|
||||
}
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
stalenessTimestamp, _ := s.store.getStalenessTimestamp(bo, oracle.GlobalTxnScope, 100)
|
||||
expectedTs := []uint64{
|
||||
100,
|
||||
stalenessTimestamp,
|
||||
|
||||
101,
|
||||
102,
|
||||
|
||||
stalenessTimestamp,
|
||||
// it's too hard to figure out the value `getTimestampWithRetry` returns
|
||||
// so we just check whether it is greater than stalenessTimestamp
|
||||
0,
|
||||
}
|
||||
for i, cs := range cases {
|
||||
expected := expectedTs[i]
|
||||
result, _ := extractStartTs(s.store, cs)
|
||||
if expected == 0 {
|
||||
c.Assert(result, Greater, stalenessTimestamp)
|
||||
} else {
|
||||
c.Assert(result, Equals, expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) {
|
||||
s.store.resolveTSMu.resolveTS[2] = 0x8000000000000002
|
||||
s.store.resolveTSMu.resolveTS[3] = 0x8000000000000001
|
||||
|
||||
i := uint64(100)
|
||||
cases := []kv.TransactionOption{
|
||||
{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i},
|
||||
{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i},
|
||||
}
|
||||
expectedTs := []uint64{0x8000000000000001, 0x8000000000000002}
|
||||
for i, cs := range cases {
|
||||
expected := expectedTs[i]
|
||||
result, _ := extractStartTs(s.store, cs)
|
||||
c.Assert(result, Equals, expected)
|
||||
}
|
||||
}
|
||||
@ -26,7 +26,7 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
tidbkv "github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/tikv/config"
|
||||
tikverr "github.com/pingcap/tidb/store/tikv/error"
|
||||
"github.com/pingcap/tidb/store/tikv/kv"
|
||||
@ -184,72 +184,12 @@ func (s *KVStore) runSafePointChecker() {
|
||||
|
||||
// Begin a global transaction.
|
||||
func (s *KVStore) Begin() (*KVTxn, error) {
|
||||
return s.BeginWithTxnScope(oracle.GlobalTxnScope)
|
||||
return s.BeginWithOption(tidbkv.DefaultTransactionOption())
|
||||
}
|
||||
|
||||
// BeginWithTxnScope begins a transaction with the given txnScope (local or global)
|
||||
func (s *KVStore) BeginWithTxnScope(txnScope string) (*KVTxn, error) {
|
||||
txn, err := newTiKVTxn(s, txnScope)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
// BeginWithStartTS begins a transaction with startTS.
|
||||
func (s *KVStore) BeginWithStartTS(txnScope string, startTS uint64) (*KVTxn, error) {
|
||||
txn, err := newTiKVTxnWithStartTS(s, txnScope, startTS, s.nextReplicaReadSeed())
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
// BeginWithExactStaleness begins transaction with given staleness
|
||||
func (s *KVStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (*KVTxn, error) {
|
||||
txn, err := newTiKVTxnWithExactStaleness(s, txnScope, prevSec)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
// BeginWithMinStartTS begins transaction with the least startTS
|
||||
func (s *KVStore) BeginWithMinStartTS(txnScope string, minStartTS uint64) (*KVTxn, error) {
|
||||
stores := make([]*Store, 0)
|
||||
allStores := s.regionCache.getStoresByType(tikvrpc.TiKV)
|
||||
if txnScope != oracle.GlobalTxnScope {
|
||||
for _, store := range allStores {
|
||||
if store.IsLabelsMatch([]*metapb.StoreLabel{
|
||||
{
|
||||
Key: DCLabelKey,
|
||||
Value: txnScope,
|
||||
},
|
||||
}) {
|
||||
stores = append(stores, store)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
stores = allStores
|
||||
}
|
||||
resolveTS := s.getMinResolveTSByStores(stores)
|
||||
startTS := minStartTS
|
||||
// If the resolveTS is larger than the minStartTS, we will use resolveTS as StartTS, otherwise we will use
|
||||
// minStartTS directly.
|
||||
if oracle.CompareTS(startTS, resolveTS) < 0 {
|
||||
startTS = resolveTS
|
||||
}
|
||||
return s.BeginWithStartTS(txnScope, startTS)
|
||||
}
|
||||
|
||||
// BeginWithMaxPrevSec begins transaction with given max previous seconds for startTS
|
||||
func (s *KVStore) BeginWithMaxPrevSec(txnScope string, maxPrevSec uint64) (*KVTxn, error) {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
minStartTS, err := s.getStalenessTimestamp(bo, txnScope, maxPrevSec)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return s.BeginWithMinStartTS(txnScope, minStartTS)
|
||||
// BeginWithOption begins a transaction with the given TransactionOption
|
||||
func (s *KVStore) BeginWithOption(options tidbkv.TransactionOption) (*KVTxn, error) {
|
||||
return newTiKVTxnWithOptions(s, options)
|
||||
}
|
||||
|
||||
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
tidbkv "github.com/pingcap/tidb/kv"
|
||||
drivertxn "github.com/pingcap/tidb/store/driver/txn"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/config"
|
||||
@ -602,12 +603,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
|
||||
// Use max.Uint64 to read the data and success.
|
||||
// That means the final commitTS > startTS+2, it's not the one we provide.
|
||||
// So we cover the rety commitTS logic.
|
||||
txn1, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, committer.GetStartTS()+2)
|
||||
txn1, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2))
|
||||
c.Assert(err, IsNil)
|
||||
_, err = txn1.Get(bo.GetCtx(), []byte("x"))
|
||||
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
|
||||
|
||||
txn2, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, math.MaxUint64)
|
||||
txn2, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(math.MaxUint64))
|
||||
c.Assert(err, IsNil)
|
||||
val, err := txn2.Get(bo.GetCtx(), []byte("x"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -30,10 +30,14 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
tikverr "github.com/pingcap/tidb/store/tikv/error"
|
||||
"github.com/pingcap/tidb/store/tikv/kv"
|
||||
tikv "github.com/pingcap/tidb/store/tikv/kv"
|
||||
"github.com/pingcap/tidb/store/tikv/logutil"
|
||||
"github.com/pingcap/tidb/store/tikv/metrics"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
"github.com/pingcap/tidb/store/tikv/unionstore"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"go.uber.org/zap"
|
||||
@ -60,7 +64,7 @@ type KVTxn struct {
|
||||
commitTS uint64
|
||||
mu sync.Mutex // For thread-safe LockKeys function.
|
||||
setCnt int64
|
||||
vars *kv.Variables
|
||||
vars *tikv.Variables
|
||||
committer *twoPhaseCommitter
|
||||
lockedCnt int
|
||||
|
||||
@ -83,44 +87,80 @@ type KVTxn struct {
|
||||
kvFilter KVFilter
|
||||
}
|
||||
|
||||
func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
startTS, err := store.getTimestampWithRetry(bo, txnScope)
|
||||
func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error) {
|
||||
var startTs uint64
|
||||
var err error
|
||||
if options.StartTS != nil {
|
||||
startTs = *options.StartTS
|
||||
} else if options.PrevSec != nil {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
startTs, err = store.getStalenessTimestamp(bo, options.TxnScope, *options.PrevSec)
|
||||
} else if options.MinStartTS != nil {
|
||||
stores := make([]*Store, 0)
|
||||
allStores := store.regionCache.getStoresByType(tikvrpc.TiKV)
|
||||
if options.TxnScope != oracle.GlobalTxnScope {
|
||||
for _, store := range allStores {
|
||||
if store.IsLabelsMatch([]*metapb.StoreLabel{
|
||||
{
|
||||
Key: DCLabelKey,
|
||||
Value: options.TxnScope,
|
||||
},
|
||||
}) {
|
||||
stores = append(stores, store)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
stores = allStores
|
||||
}
|
||||
resolveTS := store.getMinResolveTSByStores(stores)
|
||||
startTs = *options.MinStartTS
|
||||
// If the resolveTS is larger than the minStartTS, we will use resolveTS as StartTS, otherwise we will use
|
||||
// minStartTS directly.
|
||||
if oracle.CompareTS(startTs, resolveTS) < 0 {
|
||||
startTs = resolveTS
|
||||
}
|
||||
} else if options.MaxPrevSec != nil {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
minStartTS, err := store.getStalenessTimestamp(bo, options.TxnScope, *options.MaxPrevSec)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
options.MinStartTS = &minStartTS
|
||||
return extractStartTs(store, options)
|
||||
} else {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
startTs, err = store.getTimestampWithRetry(bo, options.TxnScope)
|
||||
}
|
||||
return startTs, err
|
||||
}
|
||||
|
||||
func newTiKVTxnWithOptions(store *KVStore, options kv.TransactionOption) (*KVTxn, error) {
|
||||
if options.TxnScope == "" {
|
||||
options.TxnScope = oracle.GlobalTxnScope
|
||||
}
|
||||
startTs, err := extractStartTs(store, options)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return newTiKVTxnWithStartTS(store, txnScope, startTS, store.nextReplicaReadSeed())
|
||||
}
|
||||
|
||||
// newTiKVTxnWithStartTS creates a txn with startTS.
|
||||
func newTiKVTxnWithStartTS(store *KVStore, txnScope string, startTS uint64, replicaReadSeed uint32) (*KVTxn, error) {
|
||||
snapshot := newTiKVSnapshot(store, startTS, replicaReadSeed)
|
||||
return &KVTxn{
|
||||
snapshot := newTiKVSnapshot(store, startTs, store.nextReplicaReadSeed())
|
||||
newTiKVTxn := &KVTxn{
|
||||
snapshot: snapshot,
|
||||
us: unionstore.NewUnionStore(snapshot),
|
||||
store: store,
|
||||
startTS: startTS,
|
||||
startTS: startTs,
|
||||
startTime: time.Now(),
|
||||
valid: true,
|
||||
vars: kv.DefaultVars,
|
||||
scope: txnScope,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newTiKVTxnWithExactStaleness(store *KVStore, txnScope string, prevSec uint64) (*KVTxn, error) {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
startTS, err := store.getStalenessTimestamp(bo, txnScope, prevSec)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
vars: tikv.DefaultVars,
|
||||
scope: options.TxnScope,
|
||||
}
|
||||
return newTiKVTxnWithStartTS(store, txnScope, startTS, store.nextReplicaReadSeed())
|
||||
return newTiKVTxn, nil
|
||||
}
|
||||
|
||||
// SetSuccess is used to probe if kv variables are set or not. It is ONLY used in test cases.
|
||||
var SetSuccess = false
|
||||
|
||||
// SetVars sets variables to the transaction.
|
||||
func (txn *KVTxn) SetVars(vars *kv.Variables) {
|
||||
func (txn *KVTxn) SetVars(vars *tikv.Variables) {
|
||||
txn.vars = vars
|
||||
txn.snapshot.vars = vars
|
||||
failpoint.Inject("probeSetVars", func(val failpoint.Value) {
|
||||
@ -131,7 +171,7 @@ func (txn *KVTxn) SetVars(vars *kv.Variables) {
|
||||
}
|
||||
|
||||
// GetVars gets variables from the transaction.
|
||||
func (txn *KVTxn) GetVars() *kv.Variables {
|
||||
func (txn *KVTxn) GetVars() *tikv.Variables {
|
||||
return txn.vars
|
||||
}
|
||||
|
||||
@ -184,7 +224,7 @@ func (txn *KVTxn) SetOption(opt int, val interface{}) {
|
||||
txn.us.SetOption(opt, val)
|
||||
txn.snapshot.SetOption(opt, val)
|
||||
switch opt {
|
||||
case kv.SchemaAmender:
|
||||
case tikv.SchemaAmender:
|
||||
txn.schemaAmender = val.(SchemaAmender)
|
||||
}
|
||||
}
|
||||
@ -442,8 +482,8 @@ func (txn *KVTxn) onCommitted(err error) {
|
||||
}
|
||||
|
||||
// LockKeys tries to lock the entries with the keys in KV store.
|
||||
// lockWaitTime in ms, except that tidbkv.LockAlwaysWait(0) means always wait lock, tidbkv.LockNowait(-1) means nowait lock
|
||||
func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...[]byte) error {
|
||||
// lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock
|
||||
func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error {
|
||||
// Exclude keys that are already locked.
|
||||
var err error
|
||||
keys := make([][]byte, 0, len(keysInput))
|
||||
@ -494,7 +534,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput .
|
||||
if lockCtx.ReturnValues && locked {
|
||||
// An already locked key can not return values, we add an entry to let the caller get the value
|
||||
// in other ways.
|
||||
lockCtx.Values[string(key)] = kv.ReturnedValue{AlreadyLocked: true}
|
||||
lockCtx.Values[string(key)] = tikv.ReturnedValue{AlreadyLocked: true}
|
||||
}
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
@ -574,16 +614,16 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput .
|
||||
}
|
||||
}
|
||||
for _, key := range keys {
|
||||
valExists := kv.SetKeyLockedValueExists
|
||||
valExists := tikv.SetKeyLockedValueExists
|
||||
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist.
|
||||
// For other lock modes, the locked key values always exist.
|
||||
if lockCtx.ReturnValues {
|
||||
val, _ := lockCtx.Values[string(key)]
|
||||
if len(val.Value) == 0 {
|
||||
valExists = kv.SetKeyLockedValueNotExists
|
||||
valExists = tikv.SetKeyLockedValueNotExists
|
||||
}
|
||||
}
|
||||
memBuf.UpdateFlags(key, kv.SetKeyLocked, kv.DelNeedCheckExists, valExists)
|
||||
memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, valExists)
|
||||
}
|
||||
txn.lockedCnt += len(keys)
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user