diff --git a/kv/fault_injection.go b/kv/fault_injection.go index e5c3d0aebe..95a4d3dfb7 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -64,8 +64,8 @@ func (s *InjectedStore) Begin() (Transaction, error) { } // BeginWithStartTS creates an injected Transaction with startTS. -func (s *InjectedStore) BeginWithStartTS(startTS uint64) (Transaction, error) { - txn, err := s.Storage.BeginWithStartTS(startTS) +func (s *InjectedStore) BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) { + txn, err := s.Storage.BeginWithStartTS(txnScope, startTS) return &InjectedTransaction{ Transaction: txn, cfg: s.cfg, diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 500dd3a536..b137481c05 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/store/tikv/oracle" ) type testFaultInjectionSuite struct{} @@ -34,7 +35,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithStartTS(0) + _, err = storage.BeginWithStartTS(oracle.GlobalTxnScope, 0) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 79a2df1cf6..1fc3f0fd60 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -149,12 +149,16 @@ func (s *mockStorage) Begin() (Transaction, error) { return newMockTxn(), nil } +func (s *mockStorage) BeginWithTxnScope(txnScope string) (Transaction, error) { + return newMockTxn(), nil +} + func (*mockTxn) IsPessimistic() bool { return false } -// BeginWithStartTS begins a transaction with startTS. -func (s *mockStorage) BeginWithStartTS(startTS uint64) (Transaction, error) { +// BeginWithStartTS begins transaction with given txnScope and startTS. +func (s *mockStorage) BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) { return s.Begin() } diff --git a/kv/kv.go b/kv/kv.go index 7545370195..64056a8c04 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -64,6 +64,8 @@ const ( Enable1PC // GuaranteeExternalConsistency indicates whether to guarantee external consistency at the cost of an extra tso request before prewrite GuaranteeExternalConsistency + // TxnScope indicates which @@txn_scope this transaction will work with. + TxnScope ) // Priority value for transaction priority. @@ -456,10 +458,12 @@ type Driver interface { // Storage defines the interface for storage. // Isolation should be at least SI(SNAPSHOT ISOLATION) type Storage interface { - // Begin transaction + // Begin a global transaction Begin() (Transaction, error) - // BeginWithStartTS begins transaction with startTS. - BeginWithStartTS(startTS uint64) (Transaction, error) + // Begin a transaction with the given txnScope (local or global) + BeginWithTxnScope(txnScope string) (Transaction, error) + // BeginWithStartTS begins transaction with given txnScope and startTS. + BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/kv/txn.go b/kv/txn.go index 4592282f96..9bc693232b 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -114,6 +114,7 @@ func IsMockCommitErrorEnable() bool { // TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing) type TxnInfo struct { + TxnScope string `json:"txn_scope"` StartTS uint64 `json:"start_ts"` CommitTS uint64 `json:"commit_ts"` ErrMsg string `json:"error,omitempty"` diff --git a/session/session.go b/session/session.go index c6db2a4b4a..b2750946d7 100644 --- a/session/session.go +++ b/session/session.go @@ -64,6 +64,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" @@ -1634,6 +1635,7 @@ func (s *session) isTxnRetryable() bool { func (s *session) NewTxn(ctx context.Context) error { if s.txn.Valid() { txnID := s.txn.StartTS() + txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope).(string) err := s.CommitTxn(ctx) if err != nil { return err @@ -1641,10 +1643,11 @@ func (s *session) NewTxn(ctx context.Context) error { vars := s.GetSessionVars() logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit", zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion), - zap.Uint64("txnStartTS", txnID)) + zap.Uint64("txnStartTS", txnID), + zap.String("txnScope", txnScope)) } - txn, err := s.store.Begin() + txn, err := s.store.BeginWithTxnScope(s.sessionVars.CheckAndGetTxnScope()) if err != nil { return err } @@ -1660,7 +1663,6 @@ func (s *session) NewTxn(ctx context.Context) error { CreateTime: time.Now(), StartTS: txn.StartTS(), ShardStep: int(s.sessionVars.ShardAllocateStep), - TxnScope: s.GetSessionVars().TxnScope, } return nil } @@ -2380,7 +2382,6 @@ func (s *session) PrepareTxnCtx(ctx context.Context) { SchemaVersion: is.SchemaMetaVersion(), CreateTime: time.Now(), ShardStep: int(s.sessionVars.ShardAllocateStep), - TxnScope: s.GetSessionVars().TxnScope, } if !s.sessionVars.IsAutocommit() || s.sessionVars.RetryInfo.Retrying { if s.sessionVars.TxnMode == ast.Pessimistic { @@ -2423,7 +2424,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } // no need to get txn from txnFutureCh since txn should init with startTs - txn, err := s.store.BeginWithStartTS(startTS) + txn, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, startTS) if err != nil { return err } @@ -2519,7 +2520,8 @@ func (s *session) recordOnTransactionExecution(err error, counter int, duration func (s *session) checkPlacementPolicyBeforeCommit() error { var err error - txnScope := s.GetSessionVars().TxnCtx.TxnScope + // Get the txnScope of the transaction we're going to commit. + txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope) if txnScope == "" { txnScope = config.DefTxnScope } diff --git a/session/session_test.go b/session/session_test.go index 4b5e227c18..bb8e4a9bba 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" @@ -3259,6 +3260,138 @@ func (s *testSessionSuite2) TestSetTxnScope(c *C) { result.Check(testkit.Rows(oracle.GlobalTxnScope)) } +func (s *testSessionSuite2) TestGlobalAndLocalTxn(c *C) { + // Because the PD config of check_dev_2 test is not compatible with local/global txn yet, + // so we will skip this test for now. + if *withTiKV { + return + } + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1;") + defer tk.MustExec("drop table if exists t1") + tk.MustExec(`create table t1 (c int) +PARTITION BY RANGE (c) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (200) +);`) + // Config the Placement Rules + bundles := make(map[string]*placement.Bundle) + is := s.dom.InfoSchema() + is.MockBundles(bundles) + tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + partDefs := tb.Meta().GetPartitionInfo().Definitions + for _, def := range partDefs { + if def.Name.String() == "p0" { + groupID := placement.GroupID(def.ID) + bundles[groupID] = &placement.Bundle{ + ID: groupID, + Rules: []*placement.Rule{ + { + GroupID: groupID, + Role: placement.Leader, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: placement.DCLabelKey, + Op: placement.In, + Values: []string{"dc-1"}, + }, + }, + }, + }, + } + } else if def.Name.String() == "p1" { + groupID := placement.GroupID(def.ID) + bundles[groupID] = &placement.Bundle{ + ID: groupID, + Rules: []*placement.Rule{ + { + GroupID: groupID, + Role: placement.Leader, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: placement.DCLabelKey, + Op: placement.In, + Values: []string{"dc-2"}, + }, + }, + }, + }, + } + + } + } + + // set txn_scope to global + tk.MustExec(fmt.Sprintf("set @@session.txn_scope = '%s';", oracle.GlobalTxnScope)) + result := tk.MustQuery("select @@txn_scope;") + result.Check(testkit.Rows(oracle.GlobalTxnScope)) + // test global txn + tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with global scope + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 1) + tk.MustExec("begin") + txn, err := tk.Se.Txn(true) + c.Assert(err, IsNil) + c.Assert(txn.GetUnionStore().GetOption(kv.TxnScope), Equals, oracle.GlobalTxnScope) + c.Assert(txn.Valid(), IsTrue) + tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with global scope + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 2) + c.Assert(txn.Valid(), IsTrue) + tk.MustExec("commit") + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 2) + tk.MustExec("insert into t1 (c) values (101)") // in dc-2 with global scope + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 3) + + // set txn_scope to local + tk.MustExec("set @@session.txn_scope = 'dc-1';") + result = tk.MustQuery("select @@txn_scope;") + result.Check(testkit.Rows("dc-1")) + // test local txn + tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with dc-1 scope + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 4) + tk.MustExec("begin") + txn, err = tk.Se.Txn(true) + c.Assert(err, IsNil) + c.Assert(txn.GetUnionStore().GetOption(kv.TxnScope), Equals, "dc-1") + c.Assert(txn.Valid(), IsTrue) + tk.MustExec("insert into t1 (c) values (1)") // in dc-1 with dc-1 scope + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 5) + c.Assert(txn.Valid(), IsTrue) + tk.MustExec("commit") + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 5) + + // test wrong scope local txn + _, err = tk.Exec("insert into t1 (c) values (101)") // in dc-2 with dc-1 scope + c.Assert(err.Error(), Matches, ".*out of txn_scope.*") + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 5) + tk.MustExec("begin") + txn, err = tk.Se.Txn(true) + c.Assert(err, IsNil) + c.Assert(txn.GetUnionStore().GetOption(kv.TxnScope), Equals, "dc-1") + c.Assert(txn.Valid(), IsTrue) + tk.MustExec("insert into t1 (c) values (101)") // in dc-2 with dc-1 scope + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 6) + c.Assert(txn.Valid(), IsTrue) + _, err = tk.Exec("commit") + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 5) + c.Assert(err.Error(), Matches, ".*out of txn_scope.*") + result = tk.MustQuery("select * from t1") + c.Assert(len(result.Rows()), Equals, 5) +} + func (s *testSessionSuite2) TestSetEnableRateLimitAction(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) // assert default value diff --git a/session/txn.go b/session/txn.go index ff9c41df9d..7c32829bc3 100644 --- a/session/txn.go +++ b/session/txn.go @@ -331,21 +331,22 @@ func (txnFailFuture) Wait() (uint64, error) { // txnFuture is a promise, which promises to return a txn in future. type txnFuture struct { - future oracle.Future - store kv.Storage + future oracle.Future + store kv.Storage + txnScope string } func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() if err == nil { - return tf.store.BeginWithStartTS(startTS) + return tf.store.BeginWithStartTS(tf.txnScope, startTS) } else if config.GetGlobalConfig().Store == "unistore" { return nil, err } logutil.BgLogger().Warn("wait tso failed", zap.Error(err)) // It would retry get timestamp. - return tf.store.Begin() + return tf.store.BeginWithTxnScope(tf.txnScope) } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { @@ -358,11 +359,11 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { oracleStore := s.store.GetOracle() var tsFuture oracle.Future if s.sessionVars.LowResolutionTSO { - tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()}) } else { - tsFuture = oracleStore.GetTimestampAsync(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + tsFuture = oracleStore.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()}) } - ret := &txnFuture{future: tsFuture, store: s.store} + ret := &txnFuture{future: tsFuture, store: s.store, txnScope: s.sessionVars.CheckAndGetTxnScope()} failpoint.InjectContext(ctx, "mockGetTSFail", func() { ret.future = txnFailFuture{} }) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e7222477de..1fa3b5758a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -168,9 +168,6 @@ type TransactionContext struct { // TableDeltaMap lock to prevent potential data race tdmLock sync.Mutex - - // TxnScope stores the value of 'txn_scope'. - TxnScope string } // GetShard returns the shard prefix for the next `count` rowids. @@ -766,6 +763,14 @@ type SessionVars struct { GuaranteeExternalConsistency bool } +// CheckAndGetTxnScope will return the transaction scope we should use in the current session. +func (s *SessionVars) CheckAndGetTxnScope() string { + if s.InRestrictedSQL { + return oracle.GlobalTxnScope + } + return s.TxnScope +} + // UseDynamicPartitionPrune indicates whether use new dynamic partition prune. func (s *SessionVars) UseDynamicPartitionPrune() bool { return PartitionPruneMode(s.PartitionPruneMode.Load()) == DynamicOnly diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 3b305bd2aa..9180ca8670 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1167,7 +1167,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } else { start = time.Now() logutil.Event(ctx, "start get commit ts") - commitTS, err = c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars)) + commitTS, err = c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetUnionStore().GetOption(kv.TxnScope).(string)) if err != nil { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), @@ -1401,7 +1401,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *execdetails.CommitDetails) (uint64, error) { start := time.Now() logutil.Event(ctx, "start get commit ts") - commitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars)) + commitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetUnionStore().GetOption(kv.TxnScope).(string)) if err != nil { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index c11fe894c1..44b5f81701 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -610,12 +610,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { // Use max.Uint64 to read the data and success. // That means the final commitTS > startTS+2, it's not the one we provide. // So we cover the rety commitTS logic. - txn1, err := s.store.BeginWithStartTS(committer.startTS + 2) + txn1, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, committer.startTS+2) c.Assert(err, IsNil) _, err = txn1.Get(bo.ctx, []byte("x")) c.Assert(kv.IsErrNotFound(err), IsTrue) - txn2, err := s.store.BeginWithStartTS(math.MaxUint64) + txn2, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, math.MaxUint64) c.Assert(err, IsNil) val, err := txn2.Get(bo.ctx, []byte("x")) c.Assert(err, IsNil) diff --git a/store/tikv/async_commit_test.go b/store/tikv/async_commit_test.go index 987b224a34..490a0beacf 100644 --- a/store/tikv/async_commit_test.go +++ b/store/tikv/async_commit_test.go @@ -148,7 +148,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) { } func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { - txn, err := newTiKVTxn(s.store) + txn, err := newTiKVTxn(s.store, oracle.GlobalTxnScope) c.Assert(err, IsNil) txn.SetOption(kv.EnableAsyncCommit, true) for i, k := range keys { diff --git a/store/tikv/commit.go b/store/tikv/commit.go index 5b157bb74a..7680230e76 100644 --- a/store/tikv/commit.go +++ b/store/tikv/commit.go @@ -19,6 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/logutil" @@ -101,7 +102,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch } // Update commit ts and retry. - commitTS, err := c.store.getTimestampWithRetry(bo) + commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetUnionStore().GetOption(kv.TxnScope).(string)) if err != nil { logutil.Logger(bo.ctx).Warn("2PC get commitTS failed", zap.Error(err), diff --git a/store/tikv/kv.go b/store/tikv/kv.go index dfe8159bb2..4d78a6d6a4 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -325,7 +325,11 @@ func (s *tikvStore) runSafePointChecker() { } func (s *tikvStore) Begin() (kv.Transaction, error) { - txn, err := newTiKVTxn(s) + return s.BeginWithTxnScope(oracle.GlobalTxnScope) +} + +func (s *tikvStore) BeginWithTxnScope(txnScope string) (kv.Transaction, error) { + txn, err := newTiKVTxn(s, txnScope) if err != nil { return nil, errors.Trace(err) } @@ -333,8 +337,8 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { } // BeginWithStartTS begins a transaction with startTS. -func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { - txn, err := newTikvTxnWithStartTS(s, startTS, s.nextReplicaReadSeed()) +func (s *tikvStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) { + txn, err := newTiKVTxnWithStartTS(s, txnScope, startTS, s.nextReplicaReadSeed()) if err != nil { return nil, errors.Trace(err) } @@ -382,14 +386,14 @@ func (s *tikvStore) UUID() string { func (s *tikvStore) CurrentVersion() (kv.Version, error) { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTS, err := s.getTimestampWithRetry(bo) + startTS, err := s.getTimestampWithRetry(bo, oracle.GlobalTxnScope) if err != nil { return kv.NewVersion(0), errors.Trace(err) } return kv.NewVersion(startTS), nil } -func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { +func (s *tikvStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) { if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikvStore.getTimestampWithRetry", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -397,7 +401,7 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { } for { - startTS, err := s.oracle.GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + startTS, err := s.oracle.GetTimestamp(bo.ctx, &oracle.Option{TxnScope: txnScope}) // mockGetTSErrorInRetry should wait MockCommitErrorOnce first, then will run into retry() logic. // Then mockGetTSErrorInRetry will return retryable error when first retry. // Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 11a29b3fc4..e7a18b7f5d 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -49,7 +49,7 @@ func (s *testLockSuite) TearDownTest(c *C) { } func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { - txn, err := newTiKVTxn(s.store) + txn, err := newTiKVTxn(s.store, oracle.GlobalTxnScope) c.Assert(err, IsNil) if len(value) > 0 { err = txn.Set(key, value) diff --git a/store/tikv/store_test.go b/store/tikv/store_test.go index 3dcdd7a9b1..394c93782a 100644 --- a/store/tikv/store_test.go +++ b/store/tikv/store_test.go @@ -62,9 +62,9 @@ func (s *testStoreSuite) TestOracle(c *C) { s.store.oracle = o ctx := context.Background() - t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil)) + t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) c.Assert(err, IsNil) - t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil)) + t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(t1, Less, t2) @@ -90,7 +90,7 @@ func (s *testStoreSuite) TestOracle(c *C) { go func() { defer wg.Done() - t3, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, nil)) + t3, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, nil), oracle.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(t2, Less, t3) expired := s.store.oracle.IsExpired(t2, 50, &oracle.Option{}) @@ -310,10 +310,10 @@ func (s *testStoreSerialSuite) TestOracleChangeByFailpoint(c *C) { o := &mockoracle.MockOracle{} s.store.oracle = o ctx := context.Background() - t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil)) + t1, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD"), IsNil) - t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil)) + t2, err := s.store.getTimestampWithRetry(NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(t1, Greater, t2) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index cf9bbfe5f9..87a17947da 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -87,20 +87,20 @@ type tikvTxn struct { commitCallback func(info kv.TxnInfo, err error) } -func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { +func newTiKVTxn(store *tikvStore, txnScope string) (*tikvTxn, error) { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTS, err := store.getTimestampWithRetry(bo) + startTS, err := store.getTimestampWithRetry(bo, txnScope) if err != nil { return nil, errors.Trace(err) } - return newTikvTxnWithStartTS(store, startTS, store.nextReplicaReadSeed()) + return newTiKVTxnWithStartTS(store, txnScope, startTS, store.nextReplicaReadSeed()) } -// newTikvTxnWithStartTS creates a txn with startTS. -func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uint32) (*tikvTxn, error) { +// newTiKVTxnWithStartTS creates a txn with startTS. +func newTiKVTxnWithStartTS(store *tikvStore, txnScope string, startTS uint64, replicaReadSeed uint32) (*tikvTxn, error) { ver := kv.NewVersion(startTS) snapshot := newTiKVSnapshot(store, ver, replicaReadSeed) - return &tikvTxn{ + newTiKVTxn := &tikvTxn{ snapshot: snapshot, us: kv.NewUnionStore(snapshot), store: store, @@ -108,7 +108,9 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uin startTime: time.Now(), valid: true, vars: kv.DefaultVars, - }, nil + } + newTiKVTxn.SetOption(kv.TxnScope, txnScope) + return newTiKVTxn, nil } type assertionPair struct { @@ -352,7 +354,7 @@ func (txn *tikvTxn) collectLockedKeys() [][]byte { func (txn *tikvTxn) onCommitted(err error) { if txn.commitCallback != nil { - info := kv.TxnInfo{StartTS: txn.startTS, CommitTS: txn.commitTS} + info := kv.TxnInfo{TxnScope: txn.GetUnionStore().GetOption(kv.TxnScope).(string), StartTS: txn.startTS, CommitTS: txn.commitTS} if err != nil { info.ErrMsg = err.Error() } diff --git a/util/mock/context.go b/util/mock/context.go index ec77451894..6a23720cac 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/kvcache" @@ -182,7 +183,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithStartTS(startTS) + txn, err := c.Store.BeginWithStartTS(oracle.GlobalTxnScope, startTS) if err != nil { return errors.Trace(err) } diff --git a/util/mock/store.go b/util/mock/store.go index 374f25ece5..a86d18da31 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -37,8 +37,13 @@ func (s *Store) GetOracle() oracle.Oracle { return nil } // Begin implements kv.Storage interface. func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } +// BeginWithTxnScope implements kv.Storage interface. +func (s *Store) BeginWithTxnScope(txnScope string) (kv.Transaction, error) { return nil, nil } + // BeginWithStartTS implements kv.Storage interface. -func (s *Store) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { return s.Begin() } +func (s *Store) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) { + return s.Begin() +} // GetSnapshot implements kv.Storage interface. func (s *Store) GetSnapshot(ver kv.Version) kv.Snapshot { return nil }