When insert and update happens, we will cache the changed info in every session. After a duration(five minutes now) passes, handle will sweep every cache and merge them. Then dump the delta info to TiKV.
131 lines
3.8 KiB
Go
131 lines
3.8 KiB
Go
// Copyright 2017 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package statistics
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/juju/errors"
|
|
"github.com/ngaut/log"
|
|
"github.com/pingcap/tidb/sessionctx/variable"
|
|
"github.com/pingcap/tidb/util/sqlexec"
|
|
)
|
|
|
|
type tableDeltaMap map[int64]variable.TableDelta
|
|
|
|
func (m tableDeltaMap) update(id int64, delta int64, count int64) {
|
|
item := m[id]
|
|
item.Delta += delta
|
|
item.Count += count
|
|
m[id] = item
|
|
}
|
|
|
|
func (m tableDeltaMap) merge(handle *SessionStatsCollector) {
|
|
handle.Lock()
|
|
defer handle.Unlock()
|
|
for id, item := range handle.mapper {
|
|
m.update(id, item.Delta, item.Count)
|
|
}
|
|
handle.mapper = make(tableDeltaMap)
|
|
}
|
|
|
|
// SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it.
|
|
type SessionStatsCollector struct {
|
|
sync.Mutex
|
|
|
|
mapper tableDeltaMap
|
|
prev *SessionStatsCollector
|
|
next *SessionStatsCollector
|
|
// If a session is closed, it only sets this flag true. Every time we sweep the list, we will remove the useless collector.
|
|
deleted bool
|
|
}
|
|
|
|
// Delete only sets the deleted flag true, it will be deleted from list when DumpStatsDeltaToKV is called.
|
|
func (s *SessionStatsCollector) Delete() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
s.deleted = true
|
|
}
|
|
|
|
// Update will updates the delta and count for one table id.
|
|
func (s *SessionStatsCollector) Update(id int64, delta int64, count int64) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
s.mapper.update(id, delta, count)
|
|
}
|
|
|
|
// tryToRemoveFromList will remove this collector from the list if it's deleted flag is set.
|
|
func (s *SessionStatsCollector) tryToRemoveFromList() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
if !s.deleted {
|
|
return
|
|
}
|
|
next := s.next
|
|
prev := s.prev
|
|
prev.next = next
|
|
if next != nil {
|
|
next.prev = prev
|
|
}
|
|
}
|
|
|
|
// NewSessionStatsCollector allocates a stats collector for a session.
|
|
func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
|
|
h.listHead.Lock()
|
|
defer h.listHead.Unlock()
|
|
newCollector := &SessionStatsCollector{
|
|
mapper: make(tableDeltaMap),
|
|
next: h.listHead.next,
|
|
prev: h.listHead,
|
|
}
|
|
if h.listHead.next != nil {
|
|
h.listHead.next.prev = newCollector
|
|
}
|
|
h.listHead.next = newCollector
|
|
return newCollector
|
|
}
|
|
|
|
// DumpStatsDeltaToKV sweeps the whole list and updates the global map. Then we dumps every table that held in map to KV.
|
|
func (h *Handle) DumpStatsDeltaToKV() {
|
|
h.listHead.Lock()
|
|
for collector := h.listHead.next; collector != nil; collector = collector.next {
|
|
collector.tryToRemoveFromList()
|
|
h.globalMap.merge(collector)
|
|
}
|
|
h.listHead.Unlock()
|
|
for id, item := range h.globalMap {
|
|
err := h.dumpTableStatDeltaToKV(id, item)
|
|
if err == nil {
|
|
delete(h.globalMap, id)
|
|
} else {
|
|
log.Warnf("Error happens when updating stats table, the error message is %s.", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version.
|
|
func (h *Handle) dumpTableStatDeltaToKV(id int64, delta variable.TableDelta) error {
|
|
_, err := h.ctx.(sqlexec.SQLExecutor).Execute("begin")
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
_, err = h.ctx.(sqlexec.SQLExecutor).Execute(fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", h.ctx.Txn().StartTS(), delta.Delta, delta.Count, id))
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
_, err = h.ctx.(sqlexec.SQLExecutor).Execute("commit")
|
|
return errors.Trace(err)
|
|
}
|