Files
tidb/bindinfo/cache.go

194 lines
5.4 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package bindinfo
import (
"context"
"fmt"
"sync/atomic"
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)
const (
using = "using"
deleted = "deleted"
)
// bindMeta stores the basic bind info and bindSql astNode.
type bindMeta struct {
*bindRecord
ast ast.StmtNode //ast will be used to do query sql bind check
}
// cache is a k-v map, key is original sql, value is a slice of bindMeta.
type cache map[string][]*bindMeta
// Handle holds an atomic cache.
type Handle struct {
atomic.Value
}
// BindCacheUpdater is used to update the global cache.
// BindCacheUpdater will update the bind cache per 3 seconds in domain
// gorountine loop. When the tidb server first startup, the updater will load
// all bind info into memory; then load diff bind info per 3 second.
type BindCacheUpdater struct {
ctx sessionctx.Context
parser *parser.Parser
lastUpdateTime types.Time
globalHandle *Handle
}
type bindRecord struct {
OriginalSQL string
BindSQL string
Db string
// Status represents the status of the binding. It can only be one of the following values:
// 1. deleted: bindRecord is deleted, can not be used anymore.
// 2. using: bindRecord is in the normal active mode.
Status string
CreateTime types.Time
UpdateTime types.Time
Charset string
Collation string
}
// NewBindCacheUpdater creates a new BindCacheUpdater.
func NewBindCacheUpdater(ctx sessionctx.Context, handle *Handle, parser *parser.Parser) *BindCacheUpdater {
return &BindCacheUpdater{
ctx: ctx,
parser: parser,
globalHandle: handle,
}
}
// NewHandle creates a Handle with a cache.
func NewHandle() *Handle {
handle := &Handle{}
return handle
}
// Get gets cache from a Handle.
func (h *Handle) Get() cache {
bc := h.Load()
if bc != nil {
return bc.(map[string][]*bindMeta)
}
return make(map[string][]*bindMeta)
}
// LoadDiff is used to load new bind info to cache bc.
func (bindCacheUpdater *BindCacheUpdater) loadDiff(sql string, bc cache) error {
recordSets, err := bindCacheUpdater.ctx.(sqlexec.SQLExecutor).Execute(context.Background(), sql)
if err != nil {
return errors.Trace(err)
}
rs := recordSets[0]
defer terror.Call(rs.Close)
chkBatch := rs.NewRecordBatch()
for {
err = rs.Next(context.TODO(), chkBatch)
if err != nil || chkBatch.NumRows() == 0 {
return errors.Trace(err)
}
it := chunk.NewIterator4Chunk(chkBatch.Chunk)
for row := it.Begin(); row != it.End(); row = it.Next() {
record := newBindMeta(row)
err = bc.appendNode(record, bindCacheUpdater.parser)
if err != nil {
return err
}
if record.UpdateTime.Compare(bindCacheUpdater.lastUpdateTime) == 1 {
bindCacheUpdater.lastUpdateTime = record.UpdateTime
}
}
}
}
// Update updates the BindCacheUpdater's cache.
// The `fullLoad` is true only when tidb first startup, otherwise it is false.
func (bindCacheUpdater *BindCacheUpdater) Update(fullLoad bool) (err error) {
var sql string
bc := bindCacheUpdater.globalHandle.Get()
newBc := make(map[string][]*bindMeta, len(bc))
for hash, bindDataArr := range bc {
newBc[hash] = append(newBc[hash], bindDataArr...)
}
if fullLoad {
sql = "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info"
} else {
sql = fmt.Sprintf("select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info where update_time > \"%s\"", bindCacheUpdater.lastUpdateTime.String())
}
err = bindCacheUpdater.loadDiff(sql, newBc)
if err != nil {
return errors.Trace(err)
}
bindCacheUpdater.globalHandle.Store(newBc)
return nil
}
func newBindMeta(row chunk.Row) *bindRecord {
return &bindRecord{
OriginalSQL: row.GetString(0),
BindSQL: row.GetString(1),
Db: row.GetString(2),
Status: row.GetString(3),
CreateTime: row.GetTime(4),
UpdateTime: row.GetTime(5),
Charset: row.GetString(6),
Collation: row.GetString(7),
}
}
func (b cache) appendNode(newBindRecord *bindRecord, sparser *parser.Parser) error {
hash := parser.DigestHash(newBindRecord.OriginalSQL)
if bindArr, ok := b[hash]; ok {
for idx, v := range bindArr {
if v.OriginalSQL == newBindRecord.OriginalSQL && v.Db == newBindRecord.Db {
b[hash] = append(b[hash][:idx], b[hash][idx+1:]...)
if len(b[hash]) == 0 {
delete(b, hash)
}
break
}
}
}
if newBindRecord.Status == deleted {
return nil
}
stmtNodes, _, err := sparser.Parse(newBindRecord.BindSQL, newBindRecord.Charset, newBindRecord.Collation)
if err != nil {
return errors.Trace(err)
}
newNode := &bindMeta{
bindRecord: newBindRecord,
ast: stmtNodes[0],
}
b[hash] = append(b[hash], newNode)
return nil
}