diff --git a/executor/executor_test.go b/executor/executor_test.go index 44ea606ed4..df524954fe 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7383,3 +7383,64 @@ func (s *testSuite) TestIssue15563(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustQuery("select distinct 0.7544678906163867 / 0.68234634;").Check(testkit.Rows("1.10569639842486251190")) } + +func (s *testSuite) TestStalenessTransaction(c *C) { + testcases := []struct { + name string + preSQL string + sql string + IsStaleness bool + expectPhysicalTS int64 + preSec int64 + }{ + { + name: "TimestampBoundReadTimestamp", + preSQL: "begin", + sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`, + IsStaleness: true, + expectPhysicalTS: 1599321600000, + }, + { + name: "TimestampBoundExactStaleness", + preSQL: "begin", + sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`, + IsStaleness: true, + preSec: 20, + }, + { + name: "TimestampBoundExactStaleness", + preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`, + sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`, + IsStaleness: true, + preSec: 20, + }, + { + name: "begin", + preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`, + sql: "begin", + IsStaleness: false, + }, + } + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + for _, testcase := range testcases { + c.Log(testcase.name) + tk.MustExec(testcase.preSQL) + tk.MustExec(testcase.sql) + if testcase.expectPhysicalTS > 0 { + c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, testcase.expectPhysicalTS) + } else if testcase.preSec > 0 { + curSec := time.Now().Unix() + startTS := oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS) + c.Assert(startTS, Greater, (curSec-testcase.preSec-2)*1000) + c.Assert(startTS, Less, (curSec-testcase.preSec+2)*1000) + } else if !testcase.IsStaleness { + curSec := time.Now().Unix() + startTS := oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS) + c.Assert(curSec*1000-startTS, Less, time.Second/time.Millisecond) + c.Assert(startTS-curSec*1000, Less, time.Second/time.Millisecond) + } + c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness) + tk.MustExec("commit") + } +} diff --git a/executor/simple.go b/executor/simple.go index dd999892ef..665be9061e 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -37,7 +37,10 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" + driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror" @@ -557,10 +560,17 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error { } func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { + // If `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND` is the first statement in TxnCtx, we should + // always create a new Txn instead of reusing it. + if s.ReadOnly && s.Bound != nil { + return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s) + } // If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the // need to call NewTxn, which commits the existing transaction and begins a new one. + // If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should + // always create a new transaction. txnCtx := e.ctx.GetSessionVars().TxnCtx - if txnCtx.History != nil { + if txnCtx.History != nil || txnCtx.IsStaleness { err := e.ctx.NewTxn(ctx) if err != nil { return err @@ -588,6 +598,49 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { return nil } +func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error { + opt := sessionctx.StalenessTxnOption{} + opt.Mode = s.Bound.Mode + switch s.Bound.Mode { + case ast.TimestampBoundReadTimestamp: + // TODO: support funcCallExpr in future + v, ok := s.Bound.Timestamp.(*driver.ValueExpr) + if !ok { + return errors.New("Invalid value for Bound Timestamp") + } + t, err := types.ParseTime(e.ctx.GetSessionVars().StmtCtx, v.GetString(), v.GetType().Tp, types.GetFsp(v.GetString())) + if err != nil { + return err + } + gt, err := t.GoTime(e.ctx.GetSessionVars().TimeZone) + if err != nil { + return err + } + startTS := oracle.ComposeTS(gt.Unix()*1000, 0) + opt.StartTS = startTS + case ast.TimestampBoundExactStaleness: + // TODO: support funcCallExpr in future + v, ok := s.Bound.Timestamp.(*driver.ValueExpr) + if !ok { + return errors.New("Invalid value for Bound Timestamp") + } + d, err := types.ParseDuration(e.ctx.GetSessionVars().StmtCtx, v.GetString(), types.GetFsp(v.GetString())) + if err != nil { + return err + } + opt.PrevSec = uint64(d.Seconds()) + } + err := e.ctx.NewTxnWithStalenessOption(ctx, opt) + if err != nil { + return err + } + // With START TRANSACTION, autocommit remains disabled until you end + // the transaction with COMMIT or ROLLBACK. The autocommit mode then + // reverts to its previous state. + e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) + return nil +} + func (e *SimpleExec) executeRevokeRole(s *ast.RevokeRoleStmt) error { for _, role := range s.Roles { exists, err := userExists(e.ctx, role.Username, role.Hostname) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 348a37b729..c2114bd1e7 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -162,6 +162,11 @@ func (s *mockStorage) BeginWithStartTS(txnScope string, startTS uint64) (Transac return s.Begin() } +// BeginWithExactStaleness begins transaction with given exact staleness +func (s *mockStorage) BeginWithExactStaleness(txnScope string, prevSec uint64) (Transaction, error) { + return s.Begin() +} + func (s *mockStorage) GetSnapshot(ver Version) Snapshot { return &mockSnapshot{ store: newMemDB(), diff --git a/kv/kv.go b/kv/kv.go index c43435d0e5..b46f2b0a1e 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -66,6 +66,8 @@ const ( GuaranteeExternalConsistency // TxnScope indicates which @@txn_scope this transaction will work with. TxnScope + // StalenessReadOnly indicates whether the transaction is staleness read only transaction + IsStalenessReadOnly ) // Priority value for transaction priority. @@ -464,6 +466,8 @@ type Storage interface { BeginWithTxnScope(txnScope string) (Transaction, error) // BeginWithStartTS begins transaction with given txnScope and startTS. BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) + // BeginWithStalenessTS begins transaction with given staleness + BeginWithExactStaleness(txnScope string, prevSec uint64) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/session/session.go b/session/session.go index 38bc00b801..2d2d17c383 100644 --- a/session/session.go +++ b/session/session.go @@ -1651,6 +1651,7 @@ func (s *session) NewTxn(ctx context.Context) error { CreateTime: time.Now(), StartTS: txn.StartTS(), ShardStep: int(s.sessionVars.ShardAllocateStep), + IsStaleness: false, TxnScope: s.sessionVars.CheckAndGetTxnScope(), } return nil @@ -2462,6 +2463,56 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { return nil } +// NewTxnWithStalenessOption create a transaction with Staleness option +func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { + if s.txn.Valid() { + txnID := s.txn.StartTS() + txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope).(string) + err := s.CommitTxn(ctx) + if err != nil { + return err + } + vars := s.GetSessionVars() + logutil.Logger(ctx).Info("InitTxnWithExactStaleness() inside a transaction auto commit", + zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion), + zap.Uint64("txnStartTS", txnID), + zap.String("txnScope", txnScope)) + } + var txn kv.Transaction + var err error + txnScope := s.GetSessionVars().TxnScope + switch option.Mode { + case ast.TimestampBoundReadTimestamp: + txn, err = s.store.BeginWithStartTS(txnScope, option.StartTS) + if err != nil { + return err + } + case ast.TimestampBoundExactStaleness: + txn, err = s.store.BeginWithExactStaleness(txnScope, option.PrevSec) + if err != nil { + return err + } + default: + // For unsupported staleness txn cases, fallback to NewTxn + return s.NewTxn(ctx) + } + txn.SetVars(s.sessionVars.KVVars) + txn.SetOption(kv.IsStalenessReadOnly, true) + txn.SetOption(kv.TxnScope, txnScope) + s.txn.changeInvalidToValid(txn) + is := domain.GetDomain(s).InfoSchema() + s.sessionVars.TxnCtx = &variable.TransactionContext{ + InfoSchema: is, + SchemaVersion: is.SchemaMetaVersion(), + CreateTime: time.Now(), + StartTS: txn.StartTS(), + ShardStep: int(s.sessionVars.ShardAllocateStep), + IsStaleness: true, + TxnScope: txnScope, + } + return nil +} + // GetStore gets the store of session. func (s *session) GetStore() kv.Storage { return s.store diff --git a/sessionctx/context.go b/sessionctx/context.go index ea0e8cf2c4..f720a1bcd0 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -17,6 +17,7 @@ import ( "context" "fmt" + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" @@ -71,6 +72,9 @@ type Context interface { // It should be called right before we builds an executor. InitTxnWithStartTS(startTS uint64) error + // NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption + NewTxnWithStalenessOption(ctx context.Context, option StalenessTxnOption) error + // GetStore returns the store of session. GetStore() kv.Storage @@ -144,3 +148,10 @@ var ConnID = connIDCtxKeyType{} func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context { return context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID) } + +// StalenessTxnOption represents available options for the InitTxnWithStaleness +type StalenessTxnOption struct { + Mode ast.TimestampBoundMode + PrevSec uint64 + StartTS uint64 +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b7e9464b91..cafb9bc6b7 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -162,9 +162,11 @@ type TransactionContext struct { StatementCount int CouldRetry bool IsPessimistic bool - Isolation string - LockExpire uint32 - ForUpdate uint32 + // IsStaleness indicates whether the txn is read only staleness txn. + IsStaleness bool + Isolation string + LockExpire uint32 + ForUpdate uint32 // TxnScope indicates the value of txn_scope TxnScope string @@ -268,6 +270,7 @@ func (tc *TransactionContext) Cleanup() { tc.TableDeltaMap = nil tc.tdmLock.Unlock() tc.pessimisticLockCache = nil + tc.IsStaleness = false } // ClearDelta clears the delta map. diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 472ed4162e..ee3d58f88a 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -345,6 +345,14 @@ func (s *tikvStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transa return txn, nil } +func (s *tikvStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) { + txn, err := newTiKVTxnWithExactStaleness(s, txnScope, prevSec) + if err != nil { + return nil, errors.Trace(err) + } + return txn, nil +} + func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot { snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed()) return snapshot @@ -422,6 +430,19 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint6 } } +func (s *tikvStore) getStalenessTimestamp(bo *Backoffer, txnScope string, prevSec uint64) (uint64, error) { + for { + startTS, err := s.oracle.GetStaleTimestamp(bo.ctx, txnScope, prevSec) + if err == nil { + return startTS, nil + } + err = bo.Backoff(BoPDRPC, errors.Errorf("get staleness timestamp failed: %v", err)) + if err != nil { + return 0, errors.Trace(err) + } + } +} + func (s *tikvStore) nextReplicaReadSeed() uint32 { return atomic.AddUint32(&s.replicaReadSeed, 1) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 87a17947da..efc43f3730 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -113,6 +113,15 @@ func newTiKVTxnWithStartTS(store *tikvStore, txnScope string, startTS uint64, re return newTiKVTxn, nil } +func newTiKVTxnWithExactStaleness(store *tikvStore, txnScope string, prevSec uint64) (*tikvTxn, error) { + bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) + startTS, err := store.getStalenessTimestamp(bo, txnScope, prevSec) + if err != nil { + return nil, errors.Trace(err) + } + return newTiKVTxnWithStartTS(store, txnScope, startTS, store.nextReplicaReadSeed()) +} + type assertionPair struct { key kv.Key assertion kv.AssertionType diff --git a/util/mock/context.go b/util/mock/context.go index f8ace752d9..633c6afa34 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -200,6 +200,11 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } +// NewTxnWithStalenessOption implements the sessionctx.Context interface. +func (c *Context) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { + return c.NewTxn(ctx) +} + // GetStore gets the store of session. func (c *Context) GetStore() kv.Storage { return c.Store diff --git a/util/mock/store.go b/util/mock/store.go index 9fe0e762d2..6139fcb9ea 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -45,6 +45,11 @@ func (s *Store) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transactio return s.Begin() } +// BeginWithExactStaleness implements kv.Storage interface +func (s *Store) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) { + return s.Begin() +} + // GetSnapshot implements kv.Storage interface. func (s *Store) GetSnapshot(ver kv.Version) kv.Snapshot { return nil }