*: dispatch the local and global transactions (#21353)
Signed-off-by: JmPotato <ghzpotato@gmail.com>
This commit is contained in:
@ -64,8 +64,8 @@ func (s *InjectedStore) Begin() (Transaction, error) {
|
||||
}
|
||||
|
||||
// BeginWithStartTS creates an injected Transaction with startTS.
|
||||
func (s *InjectedStore) BeginWithStartTS(startTS uint64) (Transaction, error) {
|
||||
txn, err := s.Storage.BeginWithStartTS(startTS)
|
||||
func (s *InjectedStore) BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) {
|
||||
txn, err := s.Storage.BeginWithStartTS(txnScope, startTS)
|
||||
return &InjectedTransaction{
|
||||
Transaction: txn,
|
||||
cfg: s.cfg,
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/parser/terror"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
)
|
||||
|
||||
type testFaultInjectionSuite struct{}
|
||||
@ -34,7 +35,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) {
|
||||
storage := NewInjectedStore(newMockStorage(), &cfg)
|
||||
txn, err := storage.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
_, err = storage.BeginWithStartTS(0)
|
||||
_, err = storage.BeginWithStartTS(oracle.GlobalTxnScope, 0)
|
||||
c.Assert(err, IsNil)
|
||||
ver := Version{Ver: 1}
|
||||
snap := storage.GetSnapshot(ver)
|
||||
|
||||
@ -149,12 +149,16 @@ func (s *mockStorage) Begin() (Transaction, error) {
|
||||
return newMockTxn(), nil
|
||||
}
|
||||
|
||||
func (s *mockStorage) BeginWithTxnScope(txnScope string) (Transaction, error) {
|
||||
return newMockTxn(), nil
|
||||
}
|
||||
|
||||
func (*mockTxn) IsPessimistic() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// BeginWithStartTS begins a transaction with startTS.
|
||||
func (s *mockStorage) BeginWithStartTS(startTS uint64) (Transaction, error) {
|
||||
// BeginWithStartTS begins transaction with given txnScope and startTS.
|
||||
func (s *mockStorage) BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) {
|
||||
return s.Begin()
|
||||
}
|
||||
|
||||
|
||||
10
kv/kv.go
10
kv/kv.go
@ -64,6 +64,8 @@ const (
|
||||
Enable1PC
|
||||
// GuaranteeExternalConsistency indicates whether to guarantee external consistency at the cost of an extra tso request before prewrite
|
||||
GuaranteeExternalConsistency
|
||||
// TxnScope indicates which @@txn_scope this transaction will work with.
|
||||
TxnScope
|
||||
)
|
||||
|
||||
// Priority value for transaction priority.
|
||||
@ -456,10 +458,12 @@ type Driver interface {
|
||||
// Storage defines the interface for storage.
|
||||
// Isolation should be at least SI(SNAPSHOT ISOLATION)
|
||||
type Storage interface {
|
||||
// Begin transaction
|
||||
// Begin a global transaction
|
||||
Begin() (Transaction, error)
|
||||
// BeginWithStartTS begins transaction with startTS.
|
||||
BeginWithStartTS(startTS uint64) (Transaction, error)
|
||||
// Begin a transaction with the given txnScope (local or global)
|
||||
BeginWithTxnScope(txnScope string) (Transaction, error)
|
||||
// BeginWithStartTS begins transaction with given txnScope and startTS.
|
||||
BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error)
|
||||
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
|
||||
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
|
||||
GetSnapshot(ver Version) Snapshot
|
||||
|
||||
@ -114,6 +114,7 @@ func IsMockCommitErrorEnable() bool {
|
||||
|
||||
// TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing)
|
||||
type TxnInfo struct {
|
||||
TxnScope string `json:"txn_scope"`
|
||||
StartTS uint64 `json:"start_ts"`
|
||||
CommitTS uint64 `json:"commit_ts"`
|
||||
ErrMsg string `json:"error,omitempty"`
|
||||
|
||||
@ -64,6 +64,7 @@ import (
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/statistics/handle"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
@ -1634,6 +1635,7 @@ func (s *session) isTxnRetryable() bool {
|
||||
func (s *session) NewTxn(ctx context.Context) error {
|
||||
if s.txn.Valid() {
|
||||
txnID := s.txn.StartTS()
|
||||
txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope).(string)
|
||||
err := s.CommitTxn(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1641,10 +1643,11 @@ func (s *session) NewTxn(ctx context.Context) error {
|
||||
vars := s.GetSessionVars()
|
||||
logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit",
|
||||
zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion),
|
||||
zap.Uint64("txnStartTS", txnID))
|
||||
zap.Uint64("txnStartTS", txnID),
|
||||
zap.String("txnScope", txnScope))
|
||||
}
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
txn, err := s.store.BeginWithTxnScope(s.sessionVars.CheckAndGetTxnScope())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1660,7 +1663,6 @@ func (s *session) NewTxn(ctx context.Context) error {
|
||||
CreateTime: time.Now(),
|
||||
StartTS: txn.StartTS(),
|
||||
ShardStep: int(s.sessionVars.ShardAllocateStep),
|
||||
TxnScope: s.GetSessionVars().TxnScope,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -2380,7 +2382,6 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
|
||||
SchemaVersion: is.SchemaMetaVersion(),
|
||||
CreateTime: time.Now(),
|
||||
ShardStep: int(s.sessionVars.ShardAllocateStep),
|
||||
TxnScope: s.GetSessionVars().TxnScope,
|
||||
}
|
||||
if !s.sessionVars.IsAutocommit() || s.sessionVars.RetryInfo.Retrying {
|
||||
if s.sessionVars.TxnMode == ast.Pessimistic {
|
||||
@ -2423,7 +2424,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
|
||||
}
|
||||
|
||||
// no need to get txn from txnFutureCh since txn should init with startTs
|
||||
txn, err := s.store.BeginWithStartTS(startTS)
|
||||
txn, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, startTS)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2519,7 +2520,8 @@ func (s *session) recordOnTransactionExecution(err error, counter int, duration
|
||||
|
||||
func (s *session) checkPlacementPolicyBeforeCommit() error {
|
||||
var err error
|
||||
txnScope := s.GetSessionVars().TxnCtx.TxnScope
|
||||
// Get the txnScope of the transaction we're going to commit.
|
||||
txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope)
|
||||
if txnScope == "" {
|
||||
txnScope = config.DefTxnScope
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ import (
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/parser/terror"
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/ddl/placement"
|
||||
"github.com/pingcap/tidb/domain"
|
||||
"github.com/pingcap/tidb/executor"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
@ -3259,6 +3260,138 @@ func (s *testSessionSuite2) TestSetTxnScope(c *C) {
|
||||
result.Check(testkit.Rows(oracle.GlobalTxnScope))
|
||||
}
|
||||
|
||||
func (s *testSessionSuite2) TestGlobalAndLocalTxn(c *C) {
|
||||
// Because the PD config of check_dev_2 test is not compatible with local/global txn yet,
|
||||
// so we will skip this test for now.
|
||||
if *withTiKV {
|
||||
return
|
||||
}
|
||||
tk := testkit.NewTestKitWithInit(c, s.store)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
defer tk.MustExec("drop table if exists t1")
|
||||
tk.MustExec(`create table t1 (c int)
|
||||
PARTITION BY RANGE (c) (
|
||||
PARTITION p0 VALUES LESS THAN (100),
|
||||
PARTITION p1 VALUES LESS THAN (200)
|
||||
);`)
|
||||
// Config the Placement Rules
|
||||
bundles := make(map[string]*placement.Bundle)
|
||||
is := s.dom.InfoSchema()
|
||||
is.MockBundles(bundles)
|
||||
tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
|
||||
c.Assert(err, IsNil)
|
||||
partDefs := tb.Meta().GetPartitionInfo().Definitions
|
||||
for _, def := range partDefs {
|
||||
if def.Name.String() == "p0" {
|
||||
groupID := placement.GroupID(def.ID)
|
||||
bundles[groupID] = &placement.Bundle{
|
||||
ID: groupID,
|
||||
Rules: []*placement.Rule{
|
||||
{
|
||||
GroupID: groupID,
|
||||
Role: placement.Leader,
|
||||
Count: 1,
|
||||
LabelConstraints: []placement.LabelConstraint{
|
||||
{
|
||||
Key: placement.DCLabelKey,
|
||||
Op: placement.In,
|
||||
Values: []string{"dc-1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else if def.Name.String() == "p1" {
|
||||
groupID := placement.GroupID(def.ID)
|
||||
bundles[groupID] = &placement.Bundle{
|
||||
ID: groupID,
|
||||
Rules: []*placement.Rule{
|
||||
{
|
||||
GroupID: groupID,
|
||||
Role: placement.Leader,
|
||||
Count: 1,
|
||||
LabelConstraints: []placement.LabelConstraint{
|
||||
{
|
||||
Key: placement.DCLabelKey,
|
||||
Op: placement.In,
|
||||
Values: []string{"dc-2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// set txn_scope to global
|
||||
tk.MustExec(fmt.Sprintf("set @@session.txn_scope = '%s';", oracle.GlobalTxnScope))
|
||||
result := tk.MustQuery("select @@txn_scope;")
|
||||
result.Check(testkit.Rows(oracle.GlobalTxnScope))
|
||||
// test global txn
|
||||
tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with global scope
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 1)
|
||||
tk.MustExec("begin")
|
||||
txn, err := tk.Se.Txn(true)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(txn.GetUnionStore().GetOption(kv.TxnScope), Equals, oracle.GlobalTxnScope)
|
||||
c.Assert(txn.Valid(), IsTrue)
|
||||
tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with global scope
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 2)
|
||||
c.Assert(txn.Valid(), IsTrue)
|
||||
tk.MustExec("commit")
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 2)
|
||||
tk.MustExec("insert into t1 (c) values (101)") // in dc-2 with global scope
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 3)
|
||||
|
||||
// set txn_scope to local
|
||||
tk.MustExec("set @@session.txn_scope = 'dc-1';")
|
||||
result = tk.MustQuery("select @@txn_scope;")
|
||||
result.Check(testkit.Rows("dc-1"))
|
||||
// test local txn
|
||||
tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with dc-1 scope
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 4)
|
||||
tk.MustExec("begin")
|
||||
txn, err = tk.Se.Txn(true)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(txn.GetUnionStore().GetOption(kv.TxnScope), Equals, "dc-1")
|
||||
c.Assert(txn.Valid(), IsTrue)
|
||||
tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with dc-1 scope
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 5)
|
||||
c.Assert(txn.Valid(), IsTrue)
|
||||
tk.MustExec("commit")
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 5)
|
||||
|
||||
// test wrong scope local txn
|
||||
_, err = tk.Exec("insert into t1 (c) values (101)") // in dc-2 with dc-1 scope
|
||||
c.Assert(err.Error(), Matches, ".*out of txn_scope.*")
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 5)
|
||||
tk.MustExec("begin")
|
||||
txn, err = tk.Se.Txn(true)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(txn.GetUnionStore().GetOption(kv.TxnScope), Equals, "dc-1")
|
||||
c.Assert(txn.Valid(), IsTrue)
|
||||
tk.MustExec("insert into t1 (c) values (101)") // in dc-2 with dc-1 scope
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 6)
|
||||
c.Assert(txn.Valid(), IsTrue)
|
||||
_, err = tk.Exec("commit")
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 5)
|
||||
c.Assert(err.Error(), Matches, ".*out of txn_scope.*")
|
||||
result = tk.MustQuery("select * from t1")
|
||||
c.Assert(len(result.Rows()), Equals, 5)
|
||||
}
|
||||
|
||||
func (s *testSessionSuite2) TestSetEnableRateLimitAction(c *C) {
|
||||
tk := testkit.NewTestKitWithInit(c, s.store)
|
||||
// assert default value
|
||||
|
||||
@ -331,21 +331,22 @@ func (txnFailFuture) Wait() (uint64, error) {
|
||||
|
||||
// txnFuture is a promise, which promises to return a txn in future.
|
||||
type txnFuture struct {
|
||||
future oracle.Future
|
||||
store kv.Storage
|
||||
future oracle.Future
|
||||
store kv.Storage
|
||||
txnScope string
|
||||
}
|
||||
|
||||
func (tf *txnFuture) wait() (kv.Transaction, error) {
|
||||
startTS, err := tf.future.Wait()
|
||||
if err == nil {
|
||||
return tf.store.BeginWithStartTS(startTS)
|
||||
return tf.store.BeginWithStartTS(tf.txnScope, startTS)
|
||||
} else if config.GetGlobalConfig().Store == "unistore" {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logutil.BgLogger().Warn("wait tso failed", zap.Error(err))
|
||||
// It would retry get timestamp.
|
||||
return tf.store.Begin()
|
||||
return tf.store.BeginWithTxnScope(tf.txnScope)
|
||||
}
|
||||
|
||||
func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
|
||||
@ -358,11 +359,11 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
|
||||
oracleStore := s.store.GetOracle()
|
||||
var tsFuture oracle.Future
|
||||
if s.sessionVars.LowResolutionTSO {
|
||||
tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
|
||||
} else {
|
||||
tsFuture = oracleStore.GetTimestampAsync(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
tsFuture = oracleStore.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
|
||||
}
|
||||
ret := &txnFuture{future: tsFuture, store: s.store}
|
||||
ret := &txnFuture{future: tsFuture, store: s.store, txnScope: s.sessionVars.CheckAndGetTxnScope()}
|
||||
failpoint.InjectContext(ctx, "mockGetTSFail", func() {
|
||||
ret.future = txnFailFuture{}
|
||||
})
|
||||
|
||||
@ -168,9 +168,6 @@ type TransactionContext struct {
|
||||
|
||||
// TableDeltaMap lock to prevent potential data race
|
||||
tdmLock sync.Mutex
|
||||
|
||||
// TxnScope stores the value of 'txn_scope'.
|
||||
TxnScope string
|
||||
}
|
||||
|
||||
// GetShard returns the shard prefix for the next `count` rowids.
|
||||
@ -766,6 +763,14 @@ type SessionVars struct {
|
||||
GuaranteeExternalConsistency bool
|
||||
}
|
||||
|
||||
// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
|
||||
func (s *SessionVars) CheckAndGetTxnScope() string {
|
||||
if s.InRestrictedSQL {
|
||||
return oracle.GlobalTxnScope
|
||||
}
|
||||
return s.TxnScope
|
||||
}
|
||||
|
||||
// UseDynamicPartitionPrune indicates whether use new dynamic partition prune.
|
||||
func (s *SessionVars) UseDynamicPartitionPrune() bool {
|
||||
return PartitionPruneMode(s.PartitionPruneMode.Load()) == DynamicOnly
|
||||
|
||||
@ -1167,7 +1167,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
|
||||
} else {
|
||||
start = time.Now()
|
||||
logutil.Event(ctx, "start get commit ts")
|
||||
commitTS, err = c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars))
|
||||
commitTS, err = c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetUnionStore().GetOption(kv.TxnScope).(string))
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("2PC get commitTS failed",
|
||||
zap.Error(err),
|
||||
@ -1401,7 +1401,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch
|
||||
func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *execdetails.CommitDetails) (uint64, error) {
|
||||
start := time.Now()
|
||||
logutil.Event(ctx, "start get commit ts")
|
||||
commitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars))
|
||||
commitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetUnionStore().GetOption(kv.TxnScope).(string))
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("2PC get commitTS failed",
|
||||
zap.Error(err),
|
||||
|
||||
@ -610,12 +610,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
|
||||
// Use max.Uint64 to read the data and success.
|
||||
// That means the final commitTS > startTS+2, it's not the one we provide.
|
||||
// So we cover the rety commitTS logic.
|
||||
txn1, err := s.store.BeginWithStartTS(committer.startTS + 2)
|
||||
txn1, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, committer.startTS+2)
|
||||
c.Assert(err, IsNil)
|
||||
_, err = txn1.Get(bo.ctx, []byte("x"))
|
||||
c.Assert(kv.IsErrNotFound(err), IsTrue)
|
||||
|
||||
txn2, err := s.store.BeginWithStartTS(math.MaxUint64)
|
||||
txn2, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, math.MaxUint64)
|
||||
c.Assert(err, IsNil)
|
||||
val, err := txn2.Get(bo.ctx, []byte("x"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -148,7 +148,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) {
|
||||
}
|
||||
|
||||
func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
|
||||
txn, err := newTiKVTxn(s.store)
|
||||
txn, err := newTiKVTxn(s.store, oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
txn.SetOption(kv.EnableAsyncCommit, true)
|
||||
for i, k := range keys {
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/pingcap/errors"
|
||||
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
@ -101,7 +102,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
|
||||
}
|
||||
|
||||
// Update commit ts and retry.
|
||||
commitTS, err := c.store.getTimestampWithRetry(bo)
|
||||
commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetUnionStore().GetOption(kv.TxnScope).(string))
|
||||
if err != nil {
|
||||
logutil.Logger(bo.ctx).Warn("2PC get commitTS failed",
|
||||
zap.Error(err),
|
||||
|
||||
@ -325,7 +325,11 @@ func (s *tikvStore) runSafePointChecker() {
|
||||
}
|
||||
|
||||
func (s *tikvStore) Begin() (kv.Transaction, error) {
|
||||
txn, err := newTiKVTxn(s)
|
||||
return s.BeginWithTxnScope(oracle.GlobalTxnScope)
|
||||
}
|
||||
|
||||
func (s *tikvStore) BeginWithTxnScope(txnScope string) (kv.Transaction, error) {
|
||||
txn, err := newTiKVTxn(s, txnScope)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -333,8 +337,8 @@ func (s *tikvStore) Begin() (kv.Transaction, error) {
|
||||
}
|
||||
|
||||
// BeginWithStartTS begins a transaction with startTS.
|
||||
func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) {
|
||||
txn, err := newTikvTxnWithStartTS(s, startTS, s.nextReplicaReadSeed())
|
||||
func (s *tikvStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) {
|
||||
txn, err := newTiKVTxnWithStartTS(s, txnScope, startTS, s.nextReplicaReadSeed())
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -382,14 +386,14 @@ func (s *tikvStore) UUID() string {
|
||||
|
||||
func (s *tikvStore) CurrentVersion() (kv.Version, error) {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
startTS, err := s.getTimestampWithRetry(bo)
|
||||
startTS, err := s.getTimestampWithRetry(bo, oracle.GlobalTxnScope)
|
||||
if err != nil {
|
||||
return kv.NewVersion(0), errors.Trace(err)
|
||||
}
|
||||
return kv.NewVersion(startTS), nil
|
||||
}
|
||||
|
||||
func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) {
|
||||
func (s *tikvStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) {
|
||||
if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil {
|
||||
span1 := span.Tracer().StartSpan("tikvStore.getTimestampWithRetry", opentracing.ChildOf(span.Context()))
|
||||
defer span1.Finish()
|
||||
@ -397,7 +401,7 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) {
|
||||
}
|
||||
|
||||
for {
|
||||
startTS, err := s.oracle.GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
startTS, err := s.oracle.GetTimestamp(bo.ctx, &oracle.Option{TxnScope: txnScope})
|
||||
// mockGetTSErrorInRetry should wait MockCommitErrorOnce first, then will run into retry() logic.
|
||||
// Then mockGetTSErrorInRetry will return retryable error when first retry.
|
||||
// Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout
|
||||
|
||||
@ -49,7 +49,7 @@ func (s *testLockSuite) TearDownTest(c *C) {
|
||||
}
|
||||
|
||||
func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
|
||||
txn, err := newTiKVTxn(s.store)
|
||||
txn, err := newTiKVTxn(s.store, oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
if len(value) > 0 {
|
||||
err = txn.Set(key, value)
|
||||
|
||||
@ -62,9 +62,9 @@ func (s *testStoreSuite) TestOracle(c *C) {
|
||||
s.store.oracle = o
|
||||
|
||||
ctx := context.Background()
|
||||
t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil))
|
||||
t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil))
|
||||
t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(t1, Less, t2)
|
||||
|
||||
@ -90,7 +90,7 @@ func (s *testStoreSuite) TestOracle(c *C) {
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
t3, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, nil))
|
||||
t3, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, nil), oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(t2, Less, t3)
|
||||
expired := s.store.oracle.IsExpired(t2, 50, &oracle.Option{})
|
||||
@ -310,10 +310,10 @@ func (s *testStoreSerialSuite) TestOracleChangeByFailpoint(c *C) {
|
||||
o := &mockoracle.MockOracle{}
|
||||
s.store.oracle = o
|
||||
ctx := context.Background()
|
||||
t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil))
|
||||
t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD"), IsNil)
|
||||
t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil))
|
||||
t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(t1, Greater, t2)
|
||||
}
|
||||
|
||||
@ -87,20 +87,20 @@ type tikvTxn struct {
|
||||
commitCallback func(info kv.TxnInfo, err error)
|
||||
}
|
||||
|
||||
func newTiKVTxn(store *tikvStore) (*tikvTxn, error) {
|
||||
func newTiKVTxn(store *tikvStore, txnScope string) (*tikvTxn, error) {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
startTS, err := store.getTimestampWithRetry(bo)
|
||||
startTS, err := store.getTimestampWithRetry(bo, txnScope)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return newTikvTxnWithStartTS(store, startTS, store.nextReplicaReadSeed())
|
||||
return newTiKVTxnWithStartTS(store, txnScope, startTS, store.nextReplicaReadSeed())
|
||||
}
|
||||
|
||||
// newTikvTxnWithStartTS creates a txn with startTS.
|
||||
func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uint32) (*tikvTxn, error) {
|
||||
// newTiKVTxnWithStartTS creates a txn with startTS.
|
||||
func newTiKVTxnWithStartTS(store *tikvStore, txnScope string, startTS uint64, replicaReadSeed uint32) (*tikvTxn, error) {
|
||||
ver := kv.NewVersion(startTS)
|
||||
snapshot := newTiKVSnapshot(store, ver, replicaReadSeed)
|
||||
return &tikvTxn{
|
||||
newTiKVTxn := &tikvTxn{
|
||||
snapshot: snapshot,
|
||||
us: kv.NewUnionStore(snapshot),
|
||||
store: store,
|
||||
@ -108,7 +108,9 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uin
|
||||
startTime: time.Now(),
|
||||
valid: true,
|
||||
vars: kv.DefaultVars,
|
||||
}, nil
|
||||
}
|
||||
newTiKVTxn.SetOption(kv.TxnScope, txnScope)
|
||||
return newTiKVTxn, nil
|
||||
}
|
||||
|
||||
type assertionPair struct {
|
||||
@ -352,7 +354,7 @@ func (txn *tikvTxn) collectLockedKeys() [][]byte {
|
||||
|
||||
func (txn *tikvTxn) onCommitted(err error) {
|
||||
if txn.commitCallback != nil {
|
||||
info := kv.TxnInfo{StartTS: txn.startTS, CommitTS: txn.commitTS}
|
||||
info := kv.TxnInfo{TxnScope: txn.GetUnionStore().GetOption(kv.TxnScope).(string), StartTS: txn.startTS, CommitTS: txn.commitTS}
|
||||
if err != nil {
|
||||
info.ErrMsg = err.Error()
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ import (
|
||||
"github.com/pingcap/tidb/owner"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/disk"
|
||||
"github.com/pingcap/tidb/util/kvcache"
|
||||
@ -182,7 +183,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error {
|
||||
return nil
|
||||
}
|
||||
if c.Store != nil {
|
||||
txn, err := c.Store.BeginWithStartTS(startTS)
|
||||
txn, err := c.Store.BeginWithStartTS(oracle.GlobalTxnScope, startTS)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -37,8 +37,13 @@ func (s *Store) GetOracle() oracle.Oracle { return nil }
|
||||
// Begin implements kv.Storage interface.
|
||||
func (s *Store) Begin() (kv.Transaction, error) { return nil, nil }
|
||||
|
||||
// BeginWithTxnScope implements kv.Storage interface.
|
||||
func (s *Store) BeginWithTxnScope(txnScope string) (kv.Transaction, error) { return nil, nil }
|
||||
|
||||
// BeginWithStartTS implements kv.Storage interface.
|
||||
func (s *Store) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { return s.Begin() }
|
||||
func (s *Store) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) {
|
||||
return s.Begin()
|
||||
}
|
||||
|
||||
// GetSnapshot implements kv.Storage interface.
|
||||
func (s *Store) GetSnapshot(ver kv.Version) kv.Snapshot { return nil }
|
||||
|
||||
Reference in New Issue
Block a user