planner: refactor to make tableStatsDelta thread-safe (#46977)
ref pingcap/tidb#46905
This commit is contained in:
@ -92,14 +92,11 @@ type Handle struct {
|
||||
// written only after acquiring the lock.
|
||||
statsCache *cache.StatsCachePointer
|
||||
|
||||
// globalMap contains all the delta map from collectors when we dump them to KV.
|
||||
globalMap struct {
|
||||
data tableDeltaMap
|
||||
sync.Mutex
|
||||
}
|
||||
// tableDelta contains all the delta map from collectors when we dump them to KV.
|
||||
tableDelta *tableDelta
|
||||
|
||||
// colMap contains all the column stats usage information from collectors when we dump them to KV.
|
||||
colMap *statsUsage
|
||||
// statsUsage contains all the column stats usage information from collectors when we dump them to KV.
|
||||
statsUsage *statsUsage
|
||||
|
||||
// StatsLoad is used to load stats concurrently
|
||||
StatsLoad StatsLoad
|
||||
@ -182,10 +179,8 @@ func (h *Handle) Clear() {
|
||||
h.mu.ctx.GetSessionVars().EnableChunkRPC = false
|
||||
h.mu.ctx.GetSessionVars().SetProjectionConcurrency(0)
|
||||
h.listHead.ClearForTest()
|
||||
h.globalMap.Lock()
|
||||
h.globalMap.data = make(tableDeltaMap)
|
||||
h.globalMap.Unlock()
|
||||
h.colMap.reset()
|
||||
h.tableDelta.reset()
|
||||
h.statsUsage.reset()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
@ -215,8 +210,8 @@ func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool s
|
||||
return nil, err
|
||||
}
|
||||
handle.statsCache = statsCache
|
||||
handle.globalMap.data = make(tableDeltaMap)
|
||||
handle.colMap = newStatsUsage()
|
||||
handle.tableDelta = newTableDelta()
|
||||
handle.statsUsage = newStatsUsage()
|
||||
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
|
||||
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
||||
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
||||
|
||||
@ -42,9 +42,40 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type tableDeltaMap map[int64]variable.TableDelta
|
||||
// tableDelta is used to collect tables' change information.
|
||||
// All methods of it are thread-safe.
|
||||
type tableDelta struct {
|
||||
delta map[int64]variable.TableDelta // map[tableID]delta
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[int64]int64) {
|
||||
func newTableDelta() *tableDelta {
|
||||
return &tableDelta{
|
||||
delta: make(map[int64]variable.TableDelta),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *tableDelta) reset() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
m.delta = make(map[int64]variable.TableDelta)
|
||||
}
|
||||
|
||||
func (m *tableDelta) getDeltaAndReset() map[int64]variable.TableDelta {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
ret := m.delta
|
||||
m.delta = make(map[int64]variable.TableDelta)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (m *tableDelta) update(id int64, delta int64, count int64, colSize *map[int64]int64) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
updateTableDeltaMap(m.delta, id, delta, count, colSize)
|
||||
}
|
||||
|
||||
func updateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) {
|
||||
item := m[id]
|
||||
item.Delta += delta
|
||||
item.Count += count
|
||||
@ -59,13 +90,19 @@ func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[i
|
||||
m[id] = item
|
||||
}
|
||||
|
||||
func (m tableDeltaMap) merge(deltaMap tableDeltaMap) {
|
||||
func (m *tableDelta) merge(deltaMap map[int64]variable.TableDelta) {
|
||||
if len(deltaMap) == 0 {
|
||||
return
|
||||
}
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
for id, item := range deltaMap {
|
||||
m.update(id, item.Delta, item.Count, &item.ColSize)
|
||||
updateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize)
|
||||
}
|
||||
}
|
||||
|
||||
// statsUsage maps (tableID, columnID) to the last time when the column stats are used(needed).
|
||||
// All methods of it are thread-safe.
|
||||
type statsUsage struct {
|
||||
usage map[model.TableItemID]time.Time
|
||||
lock sync.RWMutex
|
||||
@ -104,15 +141,14 @@ func (m *statsUsage) merge(other map[model.TableItemID]time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
func merge(s *SessionStatsCollector, deltaMap tableDeltaMap, colMap *statsUsage) {
|
||||
deltaMap.merge(s.mapper)
|
||||
s.mapper = make(tableDeltaMap)
|
||||
func merge(s *SessionStatsCollector, deltaMap *tableDelta, colMap *statsUsage) {
|
||||
deltaMap.merge(s.mapper.getDeltaAndReset())
|
||||
colMap.merge(s.statsUsage.getUsageAndReset())
|
||||
}
|
||||
|
||||
// SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it.
|
||||
type SessionStatsCollector struct {
|
||||
mapper tableDeltaMap
|
||||
mapper *tableDelta
|
||||
statsUsage *statsUsage
|
||||
next *SessionStatsCollector
|
||||
sync.Mutex
|
||||
@ -124,7 +160,7 @@ type SessionStatsCollector struct {
|
||||
// NewSessionStatsCollector initializes a new SessionStatsCollector.
|
||||
func NewSessionStatsCollector() *SessionStatsCollector {
|
||||
return &SessionStatsCollector{
|
||||
mapper: make(tableDeltaMap),
|
||||
mapper: newTableDelta(),
|
||||
statsUsage: newStatsUsage(),
|
||||
}
|
||||
}
|
||||
@ -147,7 +183,7 @@ func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSi
|
||||
func (s *SessionStatsCollector) ClearForTest() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mapper = make(tableDeltaMap)
|
||||
s.mapper = newTableDelta()
|
||||
s.statsUsage = newStatsUsage()
|
||||
s.next = nil
|
||||
s.deleted = false
|
||||
@ -165,7 +201,7 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
|
||||
h.listHead.Lock()
|
||||
defer h.listHead.Unlock()
|
||||
newCollector := &SessionStatsCollector{
|
||||
mapper: make(tableDeltaMap),
|
||||
mapper: newTableDelta(),
|
||||
next: h.listHead.next,
|
||||
statsUsage: newStatsUsage(),
|
||||
}
|
||||
@ -376,7 +412,7 @@ const (
|
||||
// sweepList will loop over the list, merge each session's local stats into handle
|
||||
// and remove closed session's collector.
|
||||
func (h *Handle) sweepList() {
|
||||
deltaMap := make(tableDeltaMap)
|
||||
deltaMap := newTableDelta()
|
||||
colMap := newStatsUsage()
|
||||
prev := h.listHead
|
||||
prev.Lock()
|
||||
@ -395,25 +431,17 @@ func (h *Handle) sweepList() {
|
||||
}
|
||||
}
|
||||
prev.Unlock()
|
||||
h.globalMap.Lock()
|
||||
h.globalMap.data.merge(deltaMap)
|
||||
h.globalMap.Unlock()
|
||||
h.colMap.merge(colMap.getUsageAndReset())
|
||||
h.tableDelta.merge(deltaMap.getDeltaAndReset())
|
||||
h.statsUsage.merge(colMap.getUsageAndReset())
|
||||
}
|
||||
|
||||
// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
|
||||
// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
|
||||
func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
|
||||
h.sweepList()
|
||||
h.globalMap.Lock()
|
||||
deltaMap := h.globalMap.data
|
||||
h.globalMap.data = make(tableDeltaMap)
|
||||
h.globalMap.Unlock()
|
||||
deltaMap := h.tableDelta.getDeltaAndReset()
|
||||
defer func() {
|
||||
h.globalMap.Lock()
|
||||
deltaMap.merge(h.globalMap.data)
|
||||
h.globalMap.data = deltaMap
|
||||
h.globalMap.Unlock()
|
||||
h.tableDelta.merge(deltaMap)
|
||||
}()
|
||||
// TODO: pass in do.InfoSchema() to DumpStatsDeltaToKV.
|
||||
is := func() infoschema.InfoSchema {
|
||||
@ -431,7 +459,7 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if updated {
|
||||
deltaMap.update(id, -item.Delta, -item.Count, nil)
|
||||
updateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil)
|
||||
}
|
||||
if err = h.dumpTableStatColSizeToKV(id, item); err != nil {
|
||||
delete(deltaMap, id)
|
||||
@ -551,9 +579,9 @@ func (h *Handle) DumpColStatsUsageToKV() error {
|
||||
return nil
|
||||
}
|
||||
h.sweepList()
|
||||
colMap := h.colMap.getUsageAndReset()
|
||||
colMap := h.statsUsage.getUsageAndReset()
|
||||
defer func() {
|
||||
h.colMap.merge(colMap)
|
||||
h.statsUsage.merge(colMap)
|
||||
}()
|
||||
type pair struct {
|
||||
lastUsedAt string
|
||||
|
||||
@ -22,7 +22,7 @@ import (
|
||||
|
||||
func TestInsertAndDelete(t *testing.T) {
|
||||
h := Handle{
|
||||
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)},
|
||||
listHead: &SessionStatsCollector{mapper: newTableDelta()},
|
||||
}
|
||||
var items []*SessionStatsCollector
|
||||
for i := 0; i < 5; i++ {
|
||||
|
||||
Reference in New Issue
Block a user