From 181bfb73f7fd175ac8a86caeab91f8457b2f5508 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 8 Apr 2022 21:46:33 +0800 Subject: [PATCH] *: polish the table cache feature code (#33408) close pingcap/tidb#33167 --- executor/builder.go | 15 ++-- metrics/metrics.go | 1 + metrics/server.go | 9 ++ session/session.go | 50 ++--------- table/table.go | 7 +- table/tables/cache.go | 170 ++++++++++++++++++++++++++--------- table/tables/state_remote.go | 139 ++++++++++++++++++++++------ 7 files changed, 267 insertions(+), 124 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index e6bbdaa6f2..a2394c0537 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1173,11 +1173,13 @@ func (us *UnionScanExec) handleCachedTable(b *executorBuilder, x bypassDataSourc cachedTable := tbl.(table.CachedTable) // Determine whether the cache can be used. leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second - cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration) + cacheData, loading := cachedTable.TryReadFromCache(startTS, leaseDuration) if cacheData != nil { vars.StmtCtx.ReadFromTableCache = true x.setDummy() us.cacheTable = cacheData + } else if loading { + // continue } else { if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !vars.StmtCtx.InExplainStmt { store := b.ctx.GetStore() @@ -4969,13 +4971,16 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64 } sessVars := b.ctx.GetSessionVars() leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second - cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration) + cacheData, loading := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration) if cacheData != nil { sessVars.StmtCtx.ReadFromTableCache = true return cacheData - } - if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { - tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration) + } else if loading { + // continue + } else { + if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { + tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration) + } } return nil } diff --git a/metrics/metrics.go b/metrics/metrics.go index 9ee4656d57..5bbb2ab4a3 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -163,6 +163,7 @@ func RegisterMetrics() { prometheus.MustRegister(PDApiExecutionHistogram) prometheus.MustRegister(CPUProfileCounter) prometheus.MustRegister(ReadFromTableCacheCounter) + prometheus.MustRegister(LoadTableCacheDurationHistogram) tikvmetrics.InitMetrics(TiDB, TiKVClient) tikvmetrics.RegisterMetrics() diff --git a/metrics/server.go b/metrics/server.go index e373144e97..72dc9e4e4b 100644 --- a/metrics/server.go +++ b/metrics/server.go @@ -254,6 +254,15 @@ var ( Name: "cpu_profile_total", Help: "Counter of cpu profiling", }) + + LoadTableCacheDurationHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "load_table_cache_seconds", + Help: "Duration (us) for loading table cache.", + Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1us ~ 528s + }) ) // ExecuteErrorToLabel converts an execute error to label. diff --git a/session/session.go b/session/session.go index d79710fe93..61dcad4720 100644 --- a/session/session.go +++ b/session/session.go @@ -51,7 +51,7 @@ import ( "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/helper" - "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/util/logutil/consistency" "github.com/pingcap/tidb/util/topsql" @@ -632,10 +632,11 @@ type cachedTableRenewLease struct { func (c *cachedTableRenewLease) start(ctx context.Context) error { c.exit = make(chan struct{}) c.lease = make([]uint64, len(c.tables)) - wg := make(chan error) + wg := make(chan error, len(c.tables)) ith := 0 - for tid, raw := range c.tables { - go c.keepAlive(ctx, wg, raw.(tables.StateRemote), tid, &c.lease[ith]) + for _, raw := range c.tables { + tbl := raw.(table.CachedTable) + go tbl.WriteLockAndKeepAlive(ctx, c.exit, &c.lease[ith], wg) ith++ } @@ -650,47 +651,6 @@ func (c *cachedTableRenewLease) start(ctx context.Context) error { return err } -const cacheTableWriteLease = 5 * time.Second - -func (c *cachedTableRenewLease) keepAlive(ctx context.Context, wg chan error, handle tables.StateRemote, tid int64, leasePtr *uint64) { - writeLockLease, err := handle.LockForWrite(ctx, tid, cacheTableWriteLease) - atomic.StoreUint64(leasePtr, writeLockLease) - wg <- err - if err != nil { - logutil.Logger(ctx).Warn("[cached table] lock for write lock fail", zap.Error(err)) - return - } - - t := time.NewTicker(cacheTableWriteLease) - defer t.Stop() - for { - select { - case <-t.C: - if err := c.renew(ctx, handle, tid, leasePtr); err != nil { - logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", zap.Error(err)) - return - } - case <-c.exit: - return - } - } -} - -func (c *cachedTableRenewLease) renew(ctx context.Context, handle tables.StateRemote, tid int64, leasePtr *uint64) error { - oldLease := atomic.LoadUint64(leasePtr) - physicalTime := oracle.GetTimeFromTS(oldLease) - newLease := oracle.GoTimeToTS(physicalTime.Add(cacheTableWriteLease)) - - succ, err := handle.RenewWriteLease(ctx, tid, newLease) - if err != nil { - return errors.Trace(err) - } - if succ { - atomic.StoreUint64(leasePtr, newLease) - } - return nil -} - func (c *cachedTableRenewLease) stop(ctx context.Context) { close(c.exit) } diff --git a/table/table.go b/table/table.go index 8c90b9e732..775cb03bb6 100644 --- a/table/table.go +++ b/table/table.go @@ -260,9 +260,14 @@ type CachedTable interface { Init(exec sqlexec.SQLExecutor) error // TryReadFromCache checks if the cache table is readable. - TryReadFromCache(ts uint64, leaseDuration time.Duration) kv.MemBuffer + TryReadFromCache(ts uint64, leaseDuration time.Duration) (kv.MemBuffer, bool) // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) + + // WriteLockAndKeepAlive first obtain the write lock, then it renew the lease to keep the lock alive. + // 'exit' is a channel to tell the keep alive goroutine to exit. + // The result is sent to the 'wg' channel. + WriteLockAndKeepAlive(ctx context.Context, exit chan struct{}, leasePtr *uint64, wg chan error) } diff --git a/table/tables/cache.go b/table/tables/cache.go index 4de65204cb..fc9f3f52ce 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -23,10 +23,12 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -40,13 +42,30 @@ var ( type cachedTable struct { TableCommon cacheData atomic.Value - handle StateRemote totalSize int64 - - renewReadLease tokenLimit + // StateRemote is not thread-safe, this tokenLimit is used to keep only one visitor. + tokenLimit } -type tokenLimit = chan struct{} +type tokenLimit chan StateRemote + +func (t tokenLimit) TakeStateRemoteHandle() StateRemote { + handle := <-t + return handle +} + +func (t tokenLimit) TakeStateRemoteHandleNoWait() StateRemote { + select { + case handle := <-t: + return handle + default: + return nil + } +} + +func (t tokenLimit) PutStateRemoteHandle(handle StateRemote) { + t <- handle +} // cacheData pack the cache data and lease. type cacheData struct { @@ -71,10 +90,10 @@ func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) { return buffTxn.GetMemBuffer(), nil } -func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) kv.MemBuffer { +func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) (kv.MemBuffer, bool /*loading*/) { tmp := c.cacheData.Load() if tmp == nil { - return nil + return nil, false } data := tmp.(*cacheData) if ts >= data.Start && ts < data.Lease { @@ -88,22 +107,22 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k }) if distance >= 0 && distance <= leaseDuration/2 || triggerFailpoint { - select { - case c.renewReadLease <- struct{}{}: - go c.renewLease(ts, data, leaseDuration) - default: + if h := c.TakeStateRemoteHandleNoWait(); h != nil { + go c.renewLease(h, ts, data, leaseDuration) } } - return data.MemBuffer + // If data is not nil, but data.MemBuffer is nil, it means the data is being + // loading by a background goroutine. + return data.MemBuffer, data.MemBuffer == nil } - return nil + return nil, false } // newCachedTable creates a new CachedTable Instance func newCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ - TableCommon: *tbl, - renewReadLease: make(chan struct{}, 1), + TableCommon: *tbl, + tokenLimit: make(chan StateRemote, 1), } return ret, nil } @@ -115,11 +134,12 @@ func (c *cachedTable) Init(exec sqlexec.SQLExecutor) error { if !ok { return errors.New("Need sqlExec rather than sqlexec.SQLExecutor") } - c.handle = NewStateRemote(raw) + handle := NewStateRemote(raw) + c.PutStateRemoteHandle(handle) return nil } -func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, int64, error) { +func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage) (kv.MemBuffer, uint64, int64, error) { buffer, err := newMemBuffer(store) if err != nil { return nil, 0, 0, err @@ -132,9 +152,6 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) return errors.Trace(err) } startTS = txn.StartTS() - if startTS >= lease { - return errors.New("the loaded data is outdated for caching") - } it, err := txn.Iter(prefix, prefix.PrefixNext()) if err != nil { return errors.Trace(err) @@ -165,45 +182,57 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) } func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) { - select { - case c.renewReadLease <- struct{}{}: - go c.updateLockForRead(ctx, store, ts, leaseDuration) - default: - // There is a inflight calling already. + if h := c.TakeStateRemoteHandle(); h != nil { + go c.updateLockForRead(ctx, h, store, ts, leaseDuration) } } -func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) { +func (c *cachedTable) updateLockForRead(ctx context.Context, handle StateRemote, store kv.Storage, ts uint64, leaseDuration time.Duration) { defer func() { if r := recover(); r != nil { log.Error("panic in the recoverable goroutine", zap.Reflect("r", r), zap.Stack("stack trace")) } - <-c.renewReadLease + c.PutStateRemoteHandle(handle) }() // Load data from original table and the update lock information. tid := c.Meta().ID lease := leaseFromTS(ts, leaseDuration) - succ, err := c.handle.LockForRead(ctx, tid, lease) + succ, err := handle.LockForRead(ctx, tid, lease) if err != nil { log.Warn("lock cached table for read", zap.Error(err)) return } if succ { - mb, startTS, totalSize, err := c.loadDataFromOriginalTable(store, lease) - if err != nil { - log.Info("load data from table", zap.Error(err)) - return - } - c.cacheData.Store(&cacheData{ - Start: startTS, + Start: ts, Lease: lease, - MemBuffer: mb, + MemBuffer: nil, // Async loading, this will be set later. }) - atomic.StoreInt64(&c.totalSize, totalSize) + + // Make the load data process async, in case that loading data takes longer the + // lease duration, then the loaded data get staled and that process repeats forever. + go func() { + start := time.Now() + mb, startTS, totalSize, err := c.loadDataFromOriginalTable(store) + metrics.LoadTableCacheDurationHistogram.Observe(time.Since(start).Seconds()) + if err != nil { + log.Info("load data from table fail", zap.Error(err)) + return + } + + tmp := c.cacheData.Load().(*cacheData) + if tmp != nil && tmp.Start == ts { + c.cacheData.Store(&cacheData{ + Start: startTS, + Lease: tmp.Lease, + MemBuffer: mb, + }) + atomic.StoreInt64(&c.totalSize, totalSize) + } + }() } // Current status is not suitable to cache. } @@ -215,11 +244,11 @@ func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit { return nil, table.ErrOptOnCacheTable.GenWithStackByArgs("table too large") } - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + txnCtxAddCachedTable(sctx, c.Meta().ID, c) return c.TableCommon.AddRecord(sctx, r, opts...) } -func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) { +func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle *cachedTable) { txnCtx := sctx.GetSessionVars().TxnCtx if txnCtx.CachedTables == nil { txnCtx.CachedTables = make(map[int64]interface{}) @@ -235,29 +264,31 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit { return table.ErrOptOnCacheTable.GenWithStackByArgs("table too large") } - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + txnCtxAddCachedTable(sctx, c.Meta().ID, c) return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) } // RemoveRecord implements table.Table RemoveRecord interface. func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + txnCtxAddCachedTable(sctx, c.Meta().ID, c) return c.TableCommon.RemoveRecord(sctx, h, r) } // TestMockRenewLeaseABA2 is used by test function TestRenewLeaseABAFailPoint. var TestMockRenewLeaseABA2 chan struct{} -func (c *cachedTable) renewLease(ts uint64, data *cacheData, leaseDuration time.Duration) { - defer func() { <-c.renewReadLease }() - +func (c *cachedTable) renewLease(handle StateRemote, ts uint64, data *cacheData, leaseDuration time.Duration) { failpoint.Inject("mockRenewLeaseABA2", func(_ failpoint.Value) { + c.PutStateRemoteHandle(handle) <-TestMockRenewLeaseABA2 + c.TakeStateRemoteHandle() }) + defer c.PutStateRemoteHandle(handle) + tid := c.Meta().ID lease := leaseFromTS(ts, leaseDuration) - newLease, err := c.handle.RenewReadLease(context.Background(), tid, data.Lease, lease) + newLease, err := handle.RenewReadLease(context.Background(), tid, data.Lease, lease) if err != nil && !kv.IsTxnRetryableError(err) { log.Warn("Renew read lease error", zap.Error(err)) } @@ -273,3 +304,54 @@ func (c *cachedTable) renewLease(ts uint64, data *cacheData, leaseDuration time. TestMockRenewLeaseABA2 <- struct{}{} }) } + +const cacheTableWriteLease = 5 * time.Second + +func (c *cachedTable) WriteLockAndKeepAlive(ctx context.Context, exit chan struct{}, leasePtr *uint64, wg chan error) { + writeLockLease, err := c.lockForWrite(ctx) + atomic.StoreUint64(leasePtr, writeLockLease) + wg <- err + if err != nil { + logutil.Logger(ctx).Warn("[cached table] lock for write lock fail", zap.Error(err)) + return + } + + t := time.NewTicker(cacheTableWriteLease) + defer t.Stop() + for { + select { + case <-t.C: + if err := c.renew(ctx, leasePtr); err != nil { + logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", zap.Error(err)) + return + } + case <-exit: + return + } + } +} + +func (c *cachedTable) renew(ctx context.Context, leasePtr *uint64) error { + oldLease := atomic.LoadUint64(leasePtr) + physicalTime := oracle.GetTimeFromTS(oldLease) + newLease := oracle.GoTimeToTS(physicalTime.Add(cacheTableWriteLease)) + + h := c.TakeStateRemoteHandle() + defer c.PutStateRemoteHandle(h) + + succ, err := h.RenewWriteLease(ctx, c.Meta().ID, newLease) + if err != nil { + return errors.Trace(err) + } + if succ { + atomic.StoreUint64(leasePtr, newLease) + } + return nil +} + +func (c *cachedTable) lockForWrite(ctx context.Context) (uint64, error) { + handle := c.TakeStateRemoteHandle() + defer c.PutStateRemoteHandle(handle) + + return handle.LockForWrite(ctx, c.Meta().ID, cacheTableWriteLease) +} diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 6abbebf3e3..4a8d0b39b6 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -17,7 +17,6 @@ package tables import ( "context" "strconv" - "sync" "time" "github.com/pingcap/errors" @@ -56,6 +55,7 @@ func (l CachedTableLockType) String() string { } // StateRemote is the interface to control the remote state of the cached table's lock meta information. +// IMPORTANT: It's not thread-safe, the caller should be aware of that! type StateRemote interface { // Load obtain the corresponding lock type and lease value according to the tableID Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) @@ -82,7 +82,13 @@ type sqlExec interface { type stateRemoteHandle struct { exec sqlExec - sync.Mutex + + // local state, this could be staled. + // Since stateRemoteHandle is used in single thread, it's safe for all operations + // to check the local state first to avoid unnecessary remote TiKV access. + lockType CachedTableLockType + lease uint64 + oldReadLease uint64 } // NewStateRemote creates a StateRemote object. @@ -95,16 +101,22 @@ func NewStateRemote(exec sqlExec) *stateRemoteHandle { var _ StateRemote = &stateRemoteHandle{} func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { - lockType, lease, _, err := h.loadRow(ctx, tid) + lockType, lease, _, err := h.loadRow(ctx, tid, false) return lockType, lease, err } func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, newLease uint64) ( /*succ*/ bool, error) { - h.Lock() - defer h.Unlock() succ := false - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, lease, _, err := h.loadRow(ctx, tid) + if h.lease >= newLease { + // There is a write lock or intention, don't lock for read. + switch h.lockType { + case CachedTableLockIntend, CachedTableLockWrite: + return false, nil + } + } + + err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error { + lockType, lease, _, err := h.loadRow(ctx, tid, false) if err != nil { return errors.Trace(err) } @@ -137,9 +149,17 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, newLease // LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, leaseDuration time.Duration) (uint64, error) { - h.Lock() - defer h.Unlock() var ret uint64 + + if h.lockType == CachedTableLockWrite { + safe := oracle.GoTimeToTS(time.Now().Add(leaseDuration / 2)) + if h.lease > safe { + // It means the remote has already been write locked and the lock will be valid for a while. + // So we can return directly. + return h.lease, nil + } + } + for { waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid, leaseDuration) if err != nil { @@ -155,8 +175,15 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, leaseDu } func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, leaseDuration time.Duration) (waitAndRetry time.Duration, ts uint64, err error) { - err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) + var ( + _updateLocal bool + _lockType CachedTableLockType + _lease uint64 + _oldReadLease uint64 + ) + + err = h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error { + lockType, lease, oldReadLease, err := h.loadRow(ctx, tid, true) if err != nil { return errors.Trace(err) } @@ -175,6 +202,11 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, lea if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { return errors.Trace(err) } + { + _updateLocal = true + _lockType = CachedTableLockWrite + _lease = ts + } case CachedTableLockRead: // Change from READ to INTEND if _, err = h.execSQL(ctx, @@ -187,23 +219,47 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, lea // Wait for lease to expire, and then retry. waitAndRetry = waitForLeaseExpire(lease, now) + { + _updateLocal = true + _lockType = CachedTableLockIntend + _oldReadLease = lease + _lease = ts + } case CachedTableLockIntend: // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. if now > oldReadLease { - if lockType == CachedTableLockIntend { - if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return errors.Trace(err) - } + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + { + _updateLocal = true + _lockType = CachedTableLockWrite + _lease = ts } return nil } // Otherwise, the WRITE should wait for the READ lease expire. // And then retry changing the lock to WRITE waitAndRetry = waitForLeaseExpire(oldReadLease, now) + case CachedTableLockWrite: + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + { + _updateLocal = true + _lockType = CachedTableLockWrite + _lease = ts + } } return nil }) + if err == nil && _updateLocal { + h.lockType = _lockType + h.lease = _lease + h.oldReadLease = _oldReadLease + } + return } @@ -223,11 +279,9 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { // RenewReadLease renew the read lock lease. // Return the current lease value on success, and return 0 on fail. func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLocalLease, newValue uint64) (uint64, error) { - h.Lock() - defer h.Unlock() var newLease uint64 - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, remoteLease, _, err := h.loadRow(ctx, tid) + err := h.runInTxn(ctx, false, func(ctx context.Context, now uint64) error { + lockType, remoteLease, _, err := h.loadRow(ctx, tid, false) if err != nil { return errors.Trace(err) } @@ -267,15 +321,18 @@ func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLo } return nil }) + return newLease, err } func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { - h.Lock() - defer h.Unlock() var succ bool - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) + var ( + _lockType CachedTableLockType + _lease uint64 + ) + err := h.runInTxn(ctx, true, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid, true) if err != nil { return errors.Trace(err) } @@ -295,13 +352,25 @@ func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newL } } succ = true + _lockType = CachedTableLockWrite + _lease = newLease return nil }) + + if succ { + h.lockType = _lockType + h.lease = _lease + } return succ, err } -func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { - _, err := h.execSQL(ctx, "begin optimistic") +func (h *stateRemoteHandle) beginTxn(ctx context.Context, pessimistic bool) error { + var err error + if pessimistic { + _, err = h.execSQL(ctx, "begin pessimistic") + } else { + _, err = h.execSQL(ctx, "begin optimistic") + } return err } @@ -315,8 +384,8 @@ func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { return err } -func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context, txnTS uint64) error) error { - err := h.beginTxn(ctx) +func (h *stateRemoteHandle) runInTxn(ctx context.Context, pessimistic bool, fn func(ctx context.Context, txnTS uint64) error) error { + err := h.beginTxn(ctx, pessimistic) if err != nil { return errors.Trace(err) } @@ -345,8 +414,14 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co return h.commitTxn(ctx) } -func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) { - chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %?", tid) +func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64, forUpdate bool) (CachedTableLockType, uint64, uint64, error) { + var chunkRows []chunk.Row + var err error + if forUpdate { + chunkRows, err = h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid) + } else { + chunkRows, err = h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %?", tid) + } if err != nil { return 0, 0, 0, errors.Trace(err) } @@ -358,6 +433,12 @@ func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTable lockType := CachedTableLockType(col1.Value - 1) lease := chunkRows[0].GetUint64(1) oldReadLease := chunkRows[0].GetUint64(2) + + // Also store a local copy after loadRow() + h.lockType = lockType + h.lease = lease + h.oldReadLease = oldReadLease + return lockType, lease, oldReadLease, nil }