*: polish the table cache feature code (#33408)
close pingcap/tidb#33167
This commit is contained in:
@ -1173,11 +1173,13 @@ func (us *UnionScanExec) handleCachedTable(b *executorBuilder, x bypassDataSourc
|
||||
cachedTable := tbl.(table.CachedTable)
|
||||
// Determine whether the cache can be used.
|
||||
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
|
||||
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
|
||||
cacheData, loading := cachedTable.TryReadFromCache(startTS, leaseDuration)
|
||||
if cacheData != nil {
|
||||
vars.StmtCtx.ReadFromTableCache = true
|
||||
x.setDummy()
|
||||
us.cacheTable = cacheData
|
||||
} else if loading {
|
||||
// continue
|
||||
} else {
|
||||
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !vars.StmtCtx.InExplainStmt {
|
||||
store := b.ctx.GetStore()
|
||||
@ -4969,13 +4971,16 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64
|
||||
}
|
||||
sessVars := b.ctx.GetSessionVars()
|
||||
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
|
||||
cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration)
|
||||
cacheData, loading := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration)
|
||||
if cacheData != nil {
|
||||
sessVars.StmtCtx.ReadFromTableCache = true
|
||||
return cacheData
|
||||
}
|
||||
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt {
|
||||
tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration)
|
||||
} else if loading {
|
||||
// continue
|
||||
} else {
|
||||
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt {
|
||||
tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -163,6 +163,7 @@ func RegisterMetrics() {
|
||||
prometheus.MustRegister(PDApiExecutionHistogram)
|
||||
prometheus.MustRegister(CPUProfileCounter)
|
||||
prometheus.MustRegister(ReadFromTableCacheCounter)
|
||||
prometheus.MustRegister(LoadTableCacheDurationHistogram)
|
||||
|
||||
tikvmetrics.InitMetrics(TiDB, TiKVClient)
|
||||
tikvmetrics.RegisterMetrics()
|
||||
|
||||
@ -254,6 +254,15 @@ var (
|
||||
Name: "cpu_profile_total",
|
||||
Help: "Counter of cpu profiling",
|
||||
})
|
||||
|
||||
LoadTableCacheDurationHistogram = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: "tidb",
|
||||
Subsystem: "server",
|
||||
Name: "load_table_cache_seconds",
|
||||
Help: "Duration (us) for loading table cache.",
|
||||
Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1us ~ 528s
|
||||
})
|
||||
)
|
||||
|
||||
// ExecuteErrorToLabel converts an execute error to label.
|
||||
|
||||
@ -51,7 +51,7 @@ import (
|
||||
"github.com/pingcap/tidb/sessiontxn/staleread"
|
||||
"github.com/pingcap/tidb/store/driver/txn"
|
||||
"github.com/pingcap/tidb/store/helper"
|
||||
"github.com/pingcap/tidb/table/tables"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/table/temptable"
|
||||
"github.com/pingcap/tidb/util/logutil/consistency"
|
||||
"github.com/pingcap/tidb/util/topsql"
|
||||
@ -632,10 +632,11 @@ type cachedTableRenewLease struct {
|
||||
func (c *cachedTableRenewLease) start(ctx context.Context) error {
|
||||
c.exit = make(chan struct{})
|
||||
c.lease = make([]uint64, len(c.tables))
|
||||
wg := make(chan error)
|
||||
wg := make(chan error, len(c.tables))
|
||||
ith := 0
|
||||
for tid, raw := range c.tables {
|
||||
go c.keepAlive(ctx, wg, raw.(tables.StateRemote), tid, &c.lease[ith])
|
||||
for _, raw := range c.tables {
|
||||
tbl := raw.(table.CachedTable)
|
||||
go tbl.WriteLockAndKeepAlive(ctx, c.exit, &c.lease[ith], wg)
|
||||
ith++
|
||||
}
|
||||
|
||||
@ -650,47 +651,6 @@ func (c *cachedTableRenewLease) start(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
const cacheTableWriteLease = 5 * time.Second
|
||||
|
||||
func (c *cachedTableRenewLease) keepAlive(ctx context.Context, wg chan error, handle tables.StateRemote, tid int64, leasePtr *uint64) {
|
||||
writeLockLease, err := handle.LockForWrite(ctx, tid, cacheTableWriteLease)
|
||||
atomic.StoreUint64(leasePtr, writeLockLease)
|
||||
wg <- err
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("[cached table] lock for write lock fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
t := time.NewTicker(cacheTableWriteLease)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if err := c.renew(ctx, handle, tid, leasePtr); err != nil {
|
||||
logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
case <-c.exit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cachedTableRenewLease) renew(ctx context.Context, handle tables.StateRemote, tid int64, leasePtr *uint64) error {
|
||||
oldLease := atomic.LoadUint64(leasePtr)
|
||||
physicalTime := oracle.GetTimeFromTS(oldLease)
|
||||
newLease := oracle.GoTimeToTS(physicalTime.Add(cacheTableWriteLease))
|
||||
|
||||
succ, err := handle.RenewWriteLease(ctx, tid, newLease)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if succ {
|
||||
atomic.StoreUint64(leasePtr, newLease)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cachedTableRenewLease) stop(ctx context.Context) {
|
||||
close(c.exit)
|
||||
}
|
||||
|
||||
@ -260,9 +260,14 @@ type CachedTable interface {
|
||||
Init(exec sqlexec.SQLExecutor) error
|
||||
|
||||
// TryReadFromCache checks if the cache table is readable.
|
||||
TryReadFromCache(ts uint64, leaseDuration time.Duration) kv.MemBuffer
|
||||
TryReadFromCache(ts uint64, leaseDuration time.Duration) (kv.MemBuffer, bool)
|
||||
|
||||
// UpdateLockForRead If you cannot meet the conditions of the read buffer,
|
||||
// you need to update the lock information and read the data from the original table
|
||||
UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration)
|
||||
|
||||
// WriteLockAndKeepAlive first obtain the write lock, then it renew the lease to keep the lock alive.
|
||||
// 'exit' is a channel to tell the keep alive goroutine to exit.
|
||||
// The result is sent to the 'wg' channel.
|
||||
WriteLockAndKeepAlive(ctx context.Context, exit chan struct{}, leasePtr *uint64, wg chan error)
|
||||
}
|
||||
|
||||
@ -23,10 +23,12 @@ import (
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/log"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
@ -40,13 +42,30 @@ var (
|
||||
type cachedTable struct {
|
||||
TableCommon
|
||||
cacheData atomic.Value
|
||||
handle StateRemote
|
||||
totalSize int64
|
||||
|
||||
renewReadLease tokenLimit
|
||||
// StateRemote is not thread-safe, this tokenLimit is used to keep only one visitor.
|
||||
tokenLimit
|
||||
}
|
||||
|
||||
type tokenLimit = chan struct{}
|
||||
type tokenLimit chan StateRemote
|
||||
|
||||
func (t tokenLimit) TakeStateRemoteHandle() StateRemote {
|
||||
handle := <-t
|
||||
return handle
|
||||
}
|
||||
|
||||
func (t tokenLimit) TakeStateRemoteHandleNoWait() StateRemote {
|
||||
select {
|
||||
case handle := <-t:
|
||||
return handle
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (t tokenLimit) PutStateRemoteHandle(handle StateRemote) {
|
||||
t <- handle
|
||||
}
|
||||
|
||||
// cacheData pack the cache data and lease.
|
||||
type cacheData struct {
|
||||
@ -71,10 +90,10 @@ func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) {
|
||||
return buffTxn.GetMemBuffer(), nil
|
||||
}
|
||||
|
||||
func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) kv.MemBuffer {
|
||||
func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) (kv.MemBuffer, bool /*loading*/) {
|
||||
tmp := c.cacheData.Load()
|
||||
if tmp == nil {
|
||||
return nil
|
||||
return nil, false
|
||||
}
|
||||
data := tmp.(*cacheData)
|
||||
if ts >= data.Start && ts < data.Lease {
|
||||
@ -88,22 +107,22 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k
|
||||
})
|
||||
|
||||
if distance >= 0 && distance <= leaseDuration/2 || triggerFailpoint {
|
||||
select {
|
||||
case c.renewReadLease <- struct{}{}:
|
||||
go c.renewLease(ts, data, leaseDuration)
|
||||
default:
|
||||
if h := c.TakeStateRemoteHandleNoWait(); h != nil {
|
||||
go c.renewLease(h, ts, data, leaseDuration)
|
||||
}
|
||||
}
|
||||
return data.MemBuffer
|
||||
// If data is not nil, but data.MemBuffer is nil, it means the data is being
|
||||
// loading by a background goroutine.
|
||||
return data.MemBuffer, data.MemBuffer == nil
|
||||
}
|
||||
return nil
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// newCachedTable creates a new CachedTable Instance
|
||||
func newCachedTable(tbl *TableCommon) (table.Table, error) {
|
||||
ret := &cachedTable{
|
||||
TableCommon: *tbl,
|
||||
renewReadLease: make(chan struct{}, 1),
|
||||
TableCommon: *tbl,
|
||||
tokenLimit: make(chan StateRemote, 1),
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
@ -115,11 +134,12 @@ func (c *cachedTable) Init(exec sqlexec.SQLExecutor) error {
|
||||
if !ok {
|
||||
return errors.New("Need sqlExec rather than sqlexec.SQLExecutor")
|
||||
}
|
||||
c.handle = NewStateRemote(raw)
|
||||
handle := NewStateRemote(raw)
|
||||
c.PutStateRemoteHandle(handle)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, int64, error) {
|
||||
func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage) (kv.MemBuffer, uint64, int64, error) {
|
||||
buffer, err := newMemBuffer(store)
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
@ -132,9 +152,6 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
startTS = txn.StartTS()
|
||||
if startTS >= lease {
|
||||
return errors.New("the loaded data is outdated for caching")
|
||||
}
|
||||
it, err := txn.Iter(prefix, prefix.PrefixNext())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -165,45 +182,57 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64)
|
||||
}
|
||||
|
||||
func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) {
|
||||
select {
|
||||
case c.renewReadLease <- struct{}{}:
|
||||
go c.updateLockForRead(ctx, store, ts, leaseDuration)
|
||||
default:
|
||||
// There is a inflight calling already.
|
||||
if h := c.TakeStateRemoteHandle(); h != nil {
|
||||
go c.updateLockForRead(ctx, h, store, ts, leaseDuration)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) {
|
||||
func (c *cachedTable) updateLockForRead(ctx context.Context, handle StateRemote, store kv.Storage, ts uint64, leaseDuration time.Duration) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Error("panic in the recoverable goroutine",
|
||||
zap.Reflect("r", r),
|
||||
zap.Stack("stack trace"))
|
||||
}
|
||||
<-c.renewReadLease
|
||||
c.PutStateRemoteHandle(handle)
|
||||
}()
|
||||
|
||||
// Load data from original table and the update lock information.
|
||||
tid := c.Meta().ID
|
||||
lease := leaseFromTS(ts, leaseDuration)
|
||||
succ, err := c.handle.LockForRead(ctx, tid, lease)
|
||||
succ, err := handle.LockForRead(ctx, tid, lease)
|
||||
if err != nil {
|
||||
log.Warn("lock cached table for read", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if succ {
|
||||
mb, startTS, totalSize, err := c.loadDataFromOriginalTable(store, lease)
|
||||
if err != nil {
|
||||
log.Info("load data from table", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
c.cacheData.Store(&cacheData{
|
||||
Start: startTS,
|
||||
Start: ts,
|
||||
Lease: lease,
|
||||
MemBuffer: mb,
|
||||
MemBuffer: nil, // Async loading, this will be set later.
|
||||
})
|
||||
atomic.StoreInt64(&c.totalSize, totalSize)
|
||||
|
||||
// Make the load data process async, in case that loading data takes longer the
|
||||
// lease duration, then the loaded data get staled and that process repeats forever.
|
||||
go func() {
|
||||
start := time.Now()
|
||||
mb, startTS, totalSize, err := c.loadDataFromOriginalTable(store)
|
||||
metrics.LoadTableCacheDurationHistogram.Observe(time.Since(start).Seconds())
|
||||
if err != nil {
|
||||
log.Info("load data from table fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
tmp := c.cacheData.Load().(*cacheData)
|
||||
if tmp != nil && tmp.Start == ts {
|
||||
c.cacheData.Store(&cacheData{
|
||||
Start: startTS,
|
||||
Lease: tmp.Lease,
|
||||
MemBuffer: mb,
|
||||
})
|
||||
atomic.StoreInt64(&c.totalSize, totalSize)
|
||||
}
|
||||
}()
|
||||
}
|
||||
// Current status is not suitable to cache.
|
||||
}
|
||||
@ -215,11 +244,11 @@ func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
|
||||
if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit {
|
||||
return nil, table.ErrOptOnCacheTable.GenWithStackByArgs("table too large")
|
||||
}
|
||||
txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle)
|
||||
txnCtxAddCachedTable(sctx, c.Meta().ID, c)
|
||||
return c.TableCommon.AddRecord(sctx, r, opts...)
|
||||
}
|
||||
|
||||
func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) {
|
||||
func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle *cachedTable) {
|
||||
txnCtx := sctx.GetSessionVars().TxnCtx
|
||||
if txnCtx.CachedTables == nil {
|
||||
txnCtx.CachedTables = make(map[int64]interface{})
|
||||
@ -235,29 +264,31 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
|
||||
if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit {
|
||||
return table.ErrOptOnCacheTable.GenWithStackByArgs("table too large")
|
||||
}
|
||||
txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle)
|
||||
txnCtxAddCachedTable(sctx, c.Meta().ID, c)
|
||||
return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched)
|
||||
}
|
||||
|
||||
// RemoveRecord implements table.Table RemoveRecord interface.
|
||||
func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error {
|
||||
txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle)
|
||||
txnCtxAddCachedTable(sctx, c.Meta().ID, c)
|
||||
return c.TableCommon.RemoveRecord(sctx, h, r)
|
||||
}
|
||||
|
||||
// TestMockRenewLeaseABA2 is used by test function TestRenewLeaseABAFailPoint.
|
||||
var TestMockRenewLeaseABA2 chan struct{}
|
||||
|
||||
func (c *cachedTable) renewLease(ts uint64, data *cacheData, leaseDuration time.Duration) {
|
||||
defer func() { <-c.renewReadLease }()
|
||||
|
||||
func (c *cachedTable) renewLease(handle StateRemote, ts uint64, data *cacheData, leaseDuration time.Duration) {
|
||||
failpoint.Inject("mockRenewLeaseABA2", func(_ failpoint.Value) {
|
||||
c.PutStateRemoteHandle(handle)
|
||||
<-TestMockRenewLeaseABA2
|
||||
c.TakeStateRemoteHandle()
|
||||
})
|
||||
|
||||
defer c.PutStateRemoteHandle(handle)
|
||||
|
||||
tid := c.Meta().ID
|
||||
lease := leaseFromTS(ts, leaseDuration)
|
||||
newLease, err := c.handle.RenewReadLease(context.Background(), tid, data.Lease, lease)
|
||||
newLease, err := handle.RenewReadLease(context.Background(), tid, data.Lease, lease)
|
||||
if err != nil && !kv.IsTxnRetryableError(err) {
|
||||
log.Warn("Renew read lease error", zap.Error(err))
|
||||
}
|
||||
@ -273,3 +304,54 @@ func (c *cachedTable) renewLease(ts uint64, data *cacheData, leaseDuration time.
|
||||
TestMockRenewLeaseABA2 <- struct{}{}
|
||||
})
|
||||
}
|
||||
|
||||
const cacheTableWriteLease = 5 * time.Second
|
||||
|
||||
func (c *cachedTable) WriteLockAndKeepAlive(ctx context.Context, exit chan struct{}, leasePtr *uint64, wg chan error) {
|
||||
writeLockLease, err := c.lockForWrite(ctx)
|
||||
atomic.StoreUint64(leasePtr, writeLockLease)
|
||||
wg <- err
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("[cached table] lock for write lock fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
t := time.NewTicker(cacheTableWriteLease)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if err := c.renew(ctx, leasePtr); err != nil {
|
||||
logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
case <-exit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cachedTable) renew(ctx context.Context, leasePtr *uint64) error {
|
||||
oldLease := atomic.LoadUint64(leasePtr)
|
||||
physicalTime := oracle.GetTimeFromTS(oldLease)
|
||||
newLease := oracle.GoTimeToTS(physicalTime.Add(cacheTableWriteLease))
|
||||
|
||||
h := c.TakeStateRemoteHandle()
|
||||
defer c.PutStateRemoteHandle(h)
|
||||
|
||||
succ, err := h.RenewWriteLease(ctx, c.Meta().ID, newLease)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if succ {
|
||||
atomic.StoreUint64(leasePtr, newLease)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cachedTable) lockForWrite(ctx context.Context) (uint64, error) {
|
||||
handle := c.TakeStateRemoteHandle()
|
||||
defer c.PutStateRemoteHandle(handle)
|
||||
|
||||
return handle.LockForWrite(ctx, c.Meta().ID, cacheTableWriteLease)
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@ package tables
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
@ -56,6 +55,7 @@ func (l CachedTableLockType) String() string {
|
||||
}
|
||||
|
||||
// StateRemote is the interface to control the remote state of the cached table's lock meta information.
|
||||
// IMPORTANT: It's not thread-safe, the caller should be aware of that!
|
||||
type StateRemote interface {
|
||||
// Load obtain the corresponding lock type and lease value according to the tableID
|
||||
Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error)
|
||||
@ -82,7 +82,13 @@ type sqlExec interface {
|
||||
|
||||
type stateRemoteHandle struct {
|
||||
exec sqlExec
|
||||
sync.Mutex
|
||||
|
||||
// local state, this could be staled.
|
||||
// Since stateRemoteHandle is used in single thread, it's safe for all operations
|
||||
// to check the local state first to avoid unnecessary remote TiKV access.
|
||||
lockType CachedTableLockType
|
||||
lease uint64
|
||||
oldReadLease uint64
|
||||
}
|
||||
|
||||
// NewStateRemote creates a StateRemote object.
|
||||
@ -95,16 +101,22 @@ func NewStateRemote(exec sqlExec) *stateRemoteHandle {
|
||||
var _ StateRemote = &stateRemoteHandle{}
|
||||
|
||||
func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) {
|
||||
lockType, lease, _, err := h.loadRow(ctx, tid)
|
||||
lockType, lease, _, err := h.loadRow(ctx, tid, false)
|
||||
return lockType, lease, err
|
||||
}
|
||||
|
||||
func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, newLease uint64) ( /*succ*/ bool, error) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
succ := false
|
||||
err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error {
|
||||
lockType, lease, _, err := h.loadRow(ctx, tid)
|
||||
if h.lease >= newLease {
|
||||
// There is a write lock or intention, don't lock for read.
|
||||
switch h.lockType {
|
||||
case CachedTableLockIntend, CachedTableLockWrite:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error {
|
||||
lockType, lease, _, err := h.loadRow(ctx, tid, false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -137,9 +149,17 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, newLease
|
||||
|
||||
// LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease.
|
||||
func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, leaseDuration time.Duration) (uint64, error) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
var ret uint64
|
||||
|
||||
if h.lockType == CachedTableLockWrite {
|
||||
safe := oracle.GoTimeToTS(time.Now().Add(leaseDuration / 2))
|
||||
if h.lease > safe {
|
||||
// It means the remote has already been write locked and the lock will be valid for a while.
|
||||
// So we can return directly.
|
||||
return h.lease, nil
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid, leaseDuration)
|
||||
if err != nil {
|
||||
@ -155,8 +175,15 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, leaseDu
|
||||
}
|
||||
|
||||
func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, leaseDuration time.Duration) (waitAndRetry time.Duration, ts uint64, err error) {
|
||||
err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error {
|
||||
lockType, lease, oldReadLease, err := h.loadRow(ctx, tid)
|
||||
var (
|
||||
_updateLocal bool
|
||||
_lockType CachedTableLockType
|
||||
_lease uint64
|
||||
_oldReadLease uint64
|
||||
)
|
||||
|
||||
err = h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error {
|
||||
lockType, lease, oldReadLease, err := h.loadRow(ctx, tid, true)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -175,6 +202,11 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, lea
|
||||
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
{
|
||||
_updateLocal = true
|
||||
_lockType = CachedTableLockWrite
|
||||
_lease = ts
|
||||
}
|
||||
case CachedTableLockRead:
|
||||
// Change from READ to INTEND
|
||||
if _, err = h.execSQL(ctx,
|
||||
@ -187,23 +219,47 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, lea
|
||||
|
||||
// Wait for lease to expire, and then retry.
|
||||
waitAndRetry = waitForLeaseExpire(lease, now)
|
||||
{
|
||||
_updateLocal = true
|
||||
_lockType = CachedTableLockIntend
|
||||
_oldReadLease = lease
|
||||
_lease = ts
|
||||
}
|
||||
case CachedTableLockIntend:
|
||||
// `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here.
|
||||
if now > oldReadLease {
|
||||
if lockType == CachedTableLockIntend {
|
||||
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
{
|
||||
_updateLocal = true
|
||||
_lockType = CachedTableLockWrite
|
||||
_lease = ts
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Otherwise, the WRITE should wait for the READ lease expire.
|
||||
// And then retry changing the lock to WRITE
|
||||
waitAndRetry = waitForLeaseExpire(oldReadLease, now)
|
||||
case CachedTableLockWrite:
|
||||
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
{
|
||||
_updateLocal = true
|
||||
_lockType = CachedTableLockWrite
|
||||
_lease = ts
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == nil && _updateLocal {
|
||||
h.lockType = _lockType
|
||||
h.lease = _lease
|
||||
h.oldReadLease = _oldReadLease
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -223,11 +279,9 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration {
|
||||
// RenewReadLease renew the read lock lease.
|
||||
// Return the current lease value on success, and return 0 on fail.
|
||||
func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLocalLease, newValue uint64) (uint64, error) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
var newLease uint64
|
||||
err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error {
|
||||
lockType, remoteLease, _, err := h.loadRow(ctx, tid)
|
||||
err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error {
|
||||
lockType, remoteLease, _, err := h.loadRow(ctx, tid, false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -267,15 +321,18 @@ func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLo
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return newLease, err
|
||||
}
|
||||
|
||||
func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
var succ bool
|
||||
err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error {
|
||||
lockType, oldLease, _, err := h.loadRow(ctx, tid)
|
||||
var (
|
||||
_lockType CachedTableLockType
|
||||
_lease uint64
|
||||
)
|
||||
err := h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error {
|
||||
lockType, oldLease, _, err := h.loadRow(ctx, tid, true)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -295,13 +352,25 @@ func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newL
|
||||
}
|
||||
}
|
||||
succ = true
|
||||
_lockType = CachedTableLockWrite
|
||||
_lease = newLease
|
||||
return nil
|
||||
})
|
||||
|
||||
if succ {
|
||||
h.lockType = _lockType
|
||||
h.lease = _lease
|
||||
}
|
||||
return succ, err
|
||||
}
|
||||
|
||||
func (h *stateRemoteHandle) beginTxn(ctx context.Context) error {
|
||||
_, err := h.execSQL(ctx, "begin optimistic")
|
||||
func (h *stateRemoteHandle) beginTxn(ctx context.Context, pessimistic bool) error {
|
||||
var err error
|
||||
if pessimistic {
|
||||
_, err = h.execSQL(ctx, "begin pessimistic")
|
||||
} else {
|
||||
_, err = h.execSQL(ctx, "begin optimistic")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -315,8 +384,8 @@ func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context, txnTS uint64) error) error {
|
||||
err := h.beginTxn(ctx)
|
||||
func (h *stateRemoteHandle) runInTxn(ctx context.Context, pessimistic bool, fn func(ctx context.Context, txnTS uint64) error) error {
|
||||
err := h.beginTxn(ctx, pessimistic)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -345,8 +414,14 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co
|
||||
return h.commitTxn(ctx)
|
||||
}
|
||||
|
||||
func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) {
|
||||
chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %?", tid)
|
||||
func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64, forUpdate bool) (CachedTableLockType, uint64, uint64, error) {
|
||||
var chunkRows []chunk.Row
|
||||
var err error
|
||||
if forUpdate {
|
||||
chunkRows, err = h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid)
|
||||
} else {
|
||||
chunkRows, err = h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %?", tid)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, 0, 0, errors.Trace(err)
|
||||
}
|
||||
@ -358,6 +433,12 @@ func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTable
|
||||
lockType := CachedTableLockType(col1.Value - 1)
|
||||
lease := chunkRows[0].GetUint64(1)
|
||||
oldReadLease := chunkRows[0].GetUint64(2)
|
||||
|
||||
// Also store a local copy after loadRow()
|
||||
h.lockType = lockType
|
||||
h.lease = lease
|
||||
h.oldReadLease = oldReadLease
|
||||
|
||||
return lockType, lease, oldReadLease, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user