executor: support exact staleness begin statement (#21897)
This commit is contained in:
@ -7383,3 +7383,64 @@ func (s *testSuite) TestIssue15563(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustQuery("select distinct 0.7544678906163867 / 0.68234634;").Check(testkit.Rows("1.10569639842486251190"))
|
||||
}
|
||||
|
||||
func (s *testSuite) TestStalenessTransaction(c *C) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
preSQL string
|
||||
sql string
|
||||
IsStaleness bool
|
||||
expectPhysicalTS int64
|
||||
preSec int64
|
||||
}{
|
||||
{
|
||||
name: "TimestampBoundReadTimestamp",
|
||||
preSQL: "begin",
|
||||
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
|
||||
IsStaleness: true,
|
||||
expectPhysicalTS: 1599321600000,
|
||||
},
|
||||
{
|
||||
name: "TimestampBoundExactStaleness",
|
||||
preSQL: "begin",
|
||||
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`,
|
||||
IsStaleness: true,
|
||||
preSec: 20,
|
||||
},
|
||||
{
|
||||
name: "TimestampBoundExactStaleness",
|
||||
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
|
||||
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`,
|
||||
IsStaleness: true,
|
||||
preSec: 20,
|
||||
},
|
||||
{
|
||||
name: "begin",
|
||||
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
|
||||
sql: "begin",
|
||||
IsStaleness: false,
|
||||
},
|
||||
}
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("use test")
|
||||
for _, testcase := range testcases {
|
||||
c.Log(testcase.name)
|
||||
tk.MustExec(testcase.preSQL)
|
||||
tk.MustExec(testcase.sql)
|
||||
if testcase.expectPhysicalTS > 0 {
|
||||
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, testcase.expectPhysicalTS)
|
||||
} else if testcase.preSec > 0 {
|
||||
curSec := time.Now().Unix()
|
||||
startTS := oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS)
|
||||
c.Assert(startTS, Greater, (curSec-testcase.preSec-2)*1000)
|
||||
c.Assert(startTS, Less, (curSec-testcase.preSec+2)*1000)
|
||||
} else if !testcase.IsStaleness {
|
||||
curSec := time.Now().Unix()
|
||||
startTS := oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS)
|
||||
c.Assert(curSec*1000-startTS, Less, time.Second/time.Millisecond)
|
||||
c.Assert(startTS-curSec*1000, Less, time.Second/time.Millisecond)
|
||||
}
|
||||
c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness)
|
||||
tk.MustExec("commit")
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,7 +37,10 @@ import (
|
||||
"github.com/pingcap/tidb/privilege"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/types"
|
||||
driver "github.com/pingcap/tidb/types/parser_driver"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
"github.com/pingcap/tidb/util/dbterror"
|
||||
@ -557,10 +560,17 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error {
|
||||
}
|
||||
|
||||
func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
|
||||
// If `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND` is the first statement in TxnCtx, we should
|
||||
// always create a new Txn instead of reusing it.
|
||||
if s.ReadOnly && s.Bound != nil {
|
||||
return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s)
|
||||
}
|
||||
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
|
||||
// need to call NewTxn, which commits the existing transaction and begins a new one.
|
||||
// If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should
|
||||
// always create a new transaction.
|
||||
txnCtx := e.ctx.GetSessionVars().TxnCtx
|
||||
if txnCtx.History != nil {
|
||||
if txnCtx.History != nil || txnCtx.IsStaleness {
|
||||
err := e.ctx.NewTxn(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -588,6 +598,49 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error {
|
||||
opt := sessionctx.StalenessTxnOption{}
|
||||
opt.Mode = s.Bound.Mode
|
||||
switch s.Bound.Mode {
|
||||
case ast.TimestampBoundReadTimestamp:
|
||||
// TODO: support funcCallExpr in future
|
||||
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
|
||||
if !ok {
|
||||
return errors.New("Invalid value for Bound Timestamp")
|
||||
}
|
||||
t, err := types.ParseTime(e.ctx.GetSessionVars().StmtCtx, v.GetString(), v.GetType().Tp, types.GetFsp(v.GetString()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gt, err := t.GoTime(e.ctx.GetSessionVars().TimeZone)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
startTS := oracle.ComposeTS(gt.Unix()*1000, 0)
|
||||
opt.StartTS = startTS
|
||||
case ast.TimestampBoundExactStaleness:
|
||||
// TODO: support funcCallExpr in future
|
||||
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
|
||||
if !ok {
|
||||
return errors.New("Invalid value for Bound Timestamp")
|
||||
}
|
||||
d, err := types.ParseDuration(e.ctx.GetSessionVars().StmtCtx, v.GetString(), types.GetFsp(v.GetString()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
opt.PrevSec = uint64(d.Seconds())
|
||||
}
|
||||
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// With START TRANSACTION, autocommit remains disabled until you end
|
||||
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
|
||||
// reverts to its previous state.
|
||||
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *SimpleExec) executeRevokeRole(s *ast.RevokeRoleStmt) error {
|
||||
for _, role := range s.Roles {
|
||||
exists, err := userExists(e.ctx, role.Username, role.Hostname)
|
||||
|
||||
@ -162,6 +162,11 @@ func (s *mockStorage) BeginWithStartTS(txnScope string, startTS uint64) (Transac
|
||||
return s.Begin()
|
||||
}
|
||||
|
||||
// BeginWithExactStaleness begins transaction with given exact staleness
|
||||
func (s *mockStorage) BeginWithExactStaleness(txnScope string, prevSec uint64) (Transaction, error) {
|
||||
return s.Begin()
|
||||
}
|
||||
|
||||
func (s *mockStorage) GetSnapshot(ver Version) Snapshot {
|
||||
return &mockSnapshot{
|
||||
store: newMemDB(),
|
||||
|
||||
4
kv/kv.go
4
kv/kv.go
@ -66,6 +66,8 @@ const (
|
||||
GuaranteeExternalConsistency
|
||||
// TxnScope indicates which @@txn_scope this transaction will work with.
|
||||
TxnScope
|
||||
// StalenessReadOnly indicates whether the transaction is staleness read only transaction
|
||||
IsStalenessReadOnly
|
||||
)
|
||||
|
||||
// Priority value for transaction priority.
|
||||
@ -464,6 +466,8 @@ type Storage interface {
|
||||
BeginWithTxnScope(txnScope string) (Transaction, error)
|
||||
// BeginWithStartTS begins transaction with given txnScope and startTS.
|
||||
BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error)
|
||||
// BeginWithStalenessTS begins transaction with given staleness
|
||||
BeginWithExactStaleness(txnScope string, prevSec uint64) (Transaction, error)
|
||||
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
|
||||
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
|
||||
GetSnapshot(ver Version) Snapshot
|
||||
|
||||
@ -1651,6 +1651,7 @@ func (s *session) NewTxn(ctx context.Context) error {
|
||||
CreateTime: time.Now(),
|
||||
StartTS: txn.StartTS(),
|
||||
ShardStep: int(s.sessionVars.ShardAllocateStep),
|
||||
IsStaleness: false,
|
||||
TxnScope: s.sessionVars.CheckAndGetTxnScope(),
|
||||
}
|
||||
return nil
|
||||
@ -2462,6 +2463,56 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewTxnWithStalenessOption create a transaction with Staleness option
|
||||
func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error {
|
||||
if s.txn.Valid() {
|
||||
txnID := s.txn.StartTS()
|
||||
txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope).(string)
|
||||
err := s.CommitTxn(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vars := s.GetSessionVars()
|
||||
logutil.Logger(ctx).Info("InitTxnWithExactStaleness() inside a transaction auto commit",
|
||||
zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion),
|
||||
zap.Uint64("txnStartTS", txnID),
|
||||
zap.String("txnScope", txnScope))
|
||||
}
|
||||
var txn kv.Transaction
|
||||
var err error
|
||||
txnScope := s.GetSessionVars().TxnScope
|
||||
switch option.Mode {
|
||||
case ast.TimestampBoundReadTimestamp:
|
||||
txn, err = s.store.BeginWithStartTS(txnScope, option.StartTS)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case ast.TimestampBoundExactStaleness:
|
||||
txn, err = s.store.BeginWithExactStaleness(txnScope, option.PrevSec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
// For unsupported staleness txn cases, fallback to NewTxn
|
||||
return s.NewTxn(ctx)
|
||||
}
|
||||
txn.SetVars(s.sessionVars.KVVars)
|
||||
txn.SetOption(kv.IsStalenessReadOnly, true)
|
||||
txn.SetOption(kv.TxnScope, txnScope)
|
||||
s.txn.changeInvalidToValid(txn)
|
||||
is := domain.GetDomain(s).InfoSchema()
|
||||
s.sessionVars.TxnCtx = &variable.TransactionContext{
|
||||
InfoSchema: is,
|
||||
SchemaVersion: is.SchemaMetaVersion(),
|
||||
CreateTime: time.Now(),
|
||||
StartTS: txn.StartTS(),
|
||||
ShardStep: int(s.sessionVars.ShardAllocateStep),
|
||||
IsStaleness: true,
|
||||
TxnScope: txnScope,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStore gets the store of session.
|
||||
func (s *session) GetStore() kv.Storage {
|
||||
return s.store
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pingcap/parser/ast"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/owner"
|
||||
@ -71,6 +72,9 @@ type Context interface {
|
||||
// It should be called right before we builds an executor.
|
||||
InitTxnWithStartTS(startTS uint64) error
|
||||
|
||||
// NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption
|
||||
NewTxnWithStalenessOption(ctx context.Context, option StalenessTxnOption) error
|
||||
|
||||
// GetStore returns the store of session.
|
||||
GetStore() kv.Storage
|
||||
|
||||
@ -144,3 +148,10 @@ var ConnID = connIDCtxKeyType{}
|
||||
func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context {
|
||||
return context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID)
|
||||
}
|
||||
|
||||
// StalenessTxnOption represents available options for the InitTxnWithStaleness
|
||||
type StalenessTxnOption struct {
|
||||
Mode ast.TimestampBoundMode
|
||||
PrevSec uint64
|
||||
StartTS uint64
|
||||
}
|
||||
|
||||
@ -162,9 +162,11 @@ type TransactionContext struct {
|
||||
StatementCount int
|
||||
CouldRetry bool
|
||||
IsPessimistic bool
|
||||
Isolation string
|
||||
LockExpire uint32
|
||||
ForUpdate uint32
|
||||
// IsStaleness indicates whether the txn is read only staleness txn.
|
||||
IsStaleness bool
|
||||
Isolation string
|
||||
LockExpire uint32
|
||||
ForUpdate uint32
|
||||
// TxnScope indicates the value of txn_scope
|
||||
TxnScope string
|
||||
|
||||
@ -268,6 +270,7 @@ func (tc *TransactionContext) Cleanup() {
|
||||
tc.TableDeltaMap = nil
|
||||
tc.tdmLock.Unlock()
|
||||
tc.pessimisticLockCache = nil
|
||||
tc.IsStaleness = false
|
||||
}
|
||||
|
||||
// ClearDelta clears the delta map.
|
||||
|
||||
@ -345,6 +345,14 @@ func (s *tikvStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transa
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
func (s *tikvStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) {
|
||||
txn, err := newTiKVTxnWithExactStaleness(s, txnScope, prevSec)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot {
|
||||
snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed())
|
||||
return snapshot
|
||||
@ -422,6 +430,19 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint6
|
||||
}
|
||||
}
|
||||
|
||||
func (s *tikvStore) getStalenessTimestamp(bo *Backoffer, txnScope string, prevSec uint64) (uint64, error) {
|
||||
for {
|
||||
startTS, err := s.oracle.GetStaleTimestamp(bo.ctx, txnScope, prevSec)
|
||||
if err == nil {
|
||||
return startTS, nil
|
||||
}
|
||||
err = bo.Backoff(BoPDRPC, errors.Errorf("get staleness timestamp failed: %v", err))
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *tikvStore) nextReplicaReadSeed() uint32 {
|
||||
return atomic.AddUint32(&s.replicaReadSeed, 1)
|
||||
}
|
||||
|
||||
@ -113,6 +113,15 @@ func newTiKVTxnWithStartTS(store *tikvStore, txnScope string, startTS uint64, re
|
||||
return newTiKVTxn, nil
|
||||
}
|
||||
|
||||
func newTiKVTxnWithExactStaleness(store *tikvStore, txnScope string, prevSec uint64) (*tikvTxn, error) {
|
||||
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
|
||||
startTS, err := store.getStalenessTimestamp(bo, txnScope, prevSec)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return newTiKVTxnWithStartTS(store, txnScope, startTS, store.nextReplicaReadSeed())
|
||||
}
|
||||
|
||||
type assertionPair struct {
|
||||
key kv.Key
|
||||
assertion kv.AssertionType
|
||||
|
||||
@ -200,6 +200,11 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewTxnWithStalenessOption implements the sessionctx.Context interface.
|
||||
func (c *Context) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error {
|
||||
return c.NewTxn(ctx)
|
||||
}
|
||||
|
||||
// GetStore gets the store of session.
|
||||
func (c *Context) GetStore() kv.Storage {
|
||||
return c.Store
|
||||
|
||||
@ -45,6 +45,11 @@ func (s *Store) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transactio
|
||||
return s.Begin()
|
||||
}
|
||||
|
||||
// BeginWithExactStaleness implements kv.Storage interface
|
||||
func (s *Store) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) {
|
||||
return s.Begin()
|
||||
}
|
||||
|
||||
// GetSnapshot implements kv.Storage interface.
|
||||
func (s *Store) GetSnapshot(ver kv.Version) kv.Snapshot { return nil }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user