Merge pull request #917 from nieyy/master
performance_schema: add initial support for statement instruments
This commit is contained in:
@ -65,9 +65,19 @@ func decodeValue(data []byte, cols []*model.ColumnInfo) ([]interface{}, error) {
|
||||
|
||||
var rvalues []interface{}
|
||||
for i, col := range cols {
|
||||
if values[i] == nil {
|
||||
rvalues = append(rvalues, nil)
|
||||
continue
|
||||
}
|
||||
// TODO: support more types if we really need.
|
||||
switch col.Tp {
|
||||
case mysql.TypeString, mysql.TypeVarchar:
|
||||
case mysql.TypeLong:
|
||||
val := values[i].(int64)
|
||||
rvalues = append(rvalues, val)
|
||||
case mysql.TypeLonglong:
|
||||
val := values[i].(uint64)
|
||||
rvalues = append(rvalues, val)
|
||||
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeLongBlob:
|
||||
val := string(values[i].([]byte))
|
||||
rvalues = append(rvalues, val)
|
||||
case mysql.TypeEnum:
|
||||
|
||||
124
perfschema/instrument.go
Normal file
124
perfschema/instrument.go
Normal file
@ -0,0 +1,124 @@
|
||||
// Copyright 2016 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package perfschema
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/pingcap/tidb/mysql"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
// EnumCallerName is used as a parameter to avoid calling runtime.Caller(1) since
|
||||
// it is too expensive (500ns+ per call), we don't want to invoke it repeatedly for
|
||||
// each instrument.
|
||||
type EnumCallerName int
|
||||
|
||||
const (
|
||||
// CallerNameSessionExecute is for session.go:Execute() method.
|
||||
CallerNameSessionExecute EnumCallerName = iota + 1
|
||||
)
|
||||
|
||||
const (
|
||||
stageInstrumentPrefix = "stage/"
|
||||
statementInstrumentPrefix = "statement/"
|
||||
transactionInstrumentPrefix = "transaction"
|
||||
)
|
||||
|
||||
// Flag indicators for table setup_timers.
|
||||
const (
|
||||
flagStage = iota + 1
|
||||
flagStatement
|
||||
flagTransaction
|
||||
)
|
||||
|
||||
type enumTimerName int
|
||||
|
||||
// Enum values for the TIMER_NAME columns.
|
||||
// This enum is found in the following tables:
|
||||
// - performance_schema.setup_timer (TIMER_NAME)
|
||||
const (
|
||||
timerNameNone enumTimerName = iota
|
||||
timerNameNanosec
|
||||
timerNameMicrosec
|
||||
timerNameMillisec
|
||||
)
|
||||
|
||||
var (
|
||||
callerNames = make(map[EnumCallerName]string)
|
||||
)
|
||||
|
||||
// addInstrument is used to add an item to setup_instruments table.
|
||||
func (ps *perfSchema) addInstrument(name string) (uint64, error) {
|
||||
store := ps.stores[TableSetupInstruments]
|
||||
lastLsn := atomic.AddUint64(ps.lsns[TableSetupInstruments], 1)
|
||||
|
||||
batch := pool.Get().(*leveldb.Batch)
|
||||
defer func() {
|
||||
batch.Reset()
|
||||
pool.Put(batch)
|
||||
}()
|
||||
|
||||
record := []interface{}{name, mysql.Enum{Name: "YES", Value: 1}, mysql.Enum{Name: "YES", Value: 1}}
|
||||
lsn := lastLsn - 1
|
||||
rawKey := []interface{}{uint64(lsn)}
|
||||
key, err := codec.EncodeKey(nil, rawKey...)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
val, err := codec.EncodeValue(nil, record...)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
batch.Put(key, val)
|
||||
|
||||
err = store.Write(batch, nil)
|
||||
return lsn, errors.Trace(err)
|
||||
}
|
||||
|
||||
func (ps *perfSchema) getTimerName(flag int) (enumTimerName, error) {
|
||||
store := ps.stores[TableSetupTimers]
|
||||
|
||||
rawKey := []interface{}{uint64(flag)}
|
||||
key, err := codec.EncodeKey(nil, rawKey...)
|
||||
if err != nil {
|
||||
return timerNameNone, errors.Trace(err)
|
||||
}
|
||||
|
||||
val, err := store.Get(key, nil)
|
||||
if err != nil {
|
||||
return timerNameNone, errors.Trace(err)
|
||||
}
|
||||
|
||||
record, err := decodeValue(val, ps.tables[TableSetupTimers].Columns)
|
||||
if err != nil {
|
||||
return timerNameNone, errors.Trace(err)
|
||||
}
|
||||
|
||||
timerName, ok := record[1].(string)
|
||||
if !ok {
|
||||
return timerNameNone, errors.New("Timer type does not match")
|
||||
}
|
||||
switch timerName {
|
||||
case "NANOSECOND":
|
||||
return timerNameNanosec, nil
|
||||
case "MICROSECOND":
|
||||
return timerNameMicrosec, nil
|
||||
case "MILLISECOND":
|
||||
return timerNameMillisec, nil
|
||||
}
|
||||
return timerNameNone, nil
|
||||
}
|
||||
@ -22,6 +22,15 @@ import (
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
// StatementInstrument defines the methods for statement instrumentation points
|
||||
type StatementInstrument interface {
|
||||
RegisterStatement(category, name string, elem interface{})
|
||||
|
||||
StartStatement(sql string, connID uint64, callerName EnumCallerName, elem interface{}) *StatementState
|
||||
|
||||
EndStatement(state *StatementState)
|
||||
}
|
||||
|
||||
// PerfSchema defines the methods to be invoked by the executor
|
||||
type PerfSchema interface {
|
||||
// For SELECT statement only.
|
||||
@ -29,6 +38,9 @@ type PerfSchema interface {
|
||||
|
||||
// For INSERT statement only.
|
||||
ExecInsert(insertVals *InsertValues) error
|
||||
|
||||
// For statement instrumentation only.
|
||||
StatementInstrument
|
||||
}
|
||||
|
||||
type perfSchema struct {
|
||||
|
||||
@ -171,3 +171,18 @@ func (p *testPerfSchemaSuit) TestInsert(c *C) {
|
||||
mustFailExec(c, testDB, `insert into performance_schema.setup_consumers values("events_stages_current", "YES");`)
|
||||
mustFailExec(c, testDB, `insert into performance_schema.setup_timers values("timer1", "NANOSECOND");`)
|
||||
}
|
||||
|
||||
func (p *testPerfSchemaSuit) TestInstrument(c *C) {
|
||||
testDB, err := sql.Open(tidb.DriverName, tidb.EngineGoLevelDBMemory+"/test/test")
|
||||
c.Assert(err, IsNil)
|
||||
defer testDB.Close()
|
||||
|
||||
cnt := mustQuery(c, testDB, "select * from performance_schema.setup_instruments")
|
||||
c.Assert(cnt, Greater, 0)
|
||||
|
||||
mustExec(c, testDB, "show tables")
|
||||
cnt = mustQuery(c, testDB, "select * from performance_schema.events_statements_current")
|
||||
c.Assert(cnt, Equals, 1)
|
||||
cnt = mustQuery(c, testDB, "select * from performance_schema.events_statements_history")
|
||||
c.Assert(cnt, Greater, 0)
|
||||
}
|
||||
|
||||
334
perfschema/statement.go
Normal file
334
perfschema/statement.go
Normal file
@ -0,0 +1,334 @@
|
||||
// Copyright 2016 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package perfschema
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/tidb/ast"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
// statementInfo defines statement instrument information.
|
||||
type statementInfo struct {
|
||||
// The registered statement key
|
||||
key uint64
|
||||
// The name of the statement instrument to register
|
||||
name string
|
||||
}
|
||||
|
||||
// StatementState provides temporary storage to a statement runtime statistics.
|
||||
// TODO:
|
||||
// 1. support statement digest.
|
||||
// 2. support prepared statement.
|
||||
type StatementState struct {
|
||||
// Connection identifier
|
||||
connID uint64
|
||||
// Statement information
|
||||
info *statementInfo
|
||||
// Statement type
|
||||
stmtType reflect.Type
|
||||
// Source file and line number
|
||||
source string
|
||||
// Timer name
|
||||
timerName enumTimerName
|
||||
// Timer start
|
||||
timerStart int64
|
||||
// Timer end
|
||||
timerEnd int64
|
||||
// Locked time
|
||||
lockTime int64
|
||||
// SQL statement string
|
||||
sqlText string
|
||||
// Current schema name
|
||||
schemaName string
|
||||
// Number of errors
|
||||
errNum uint32
|
||||
// Number of warnings
|
||||
warnNum uint32
|
||||
// Rows affected
|
||||
rowsAffected uint64
|
||||
// Rows sent
|
||||
rowsSent uint64
|
||||
// Rows examined
|
||||
rowsExamined uint64
|
||||
// Metric, temporary tables created on disk
|
||||
createdTmpDiskTables uint32
|
||||
// Metric, temproray tables created
|
||||
createdTmpTables uint32
|
||||
// Metric, number of select full join
|
||||
selectFullJoin uint32
|
||||
// Metric, number of select full range join
|
||||
selectFullRangeJoin uint32
|
||||
// Metric, number of select range
|
||||
selectRange uint32
|
||||
// Metric, number of select range check
|
||||
selectRangeCheck uint32
|
||||
// Metric, number of select scan
|
||||
selectScan uint32
|
||||
// Metric, number of sort merge passes
|
||||
sortMergePasses uint32
|
||||
// Metric, number of sort merge
|
||||
sortRange uint32
|
||||
// Metric, number of sort rows
|
||||
sortRows uint32
|
||||
// Metric, number of sort scans
|
||||
sortScan uint32
|
||||
// Metric, no index used flag
|
||||
noIndexUsed uint8
|
||||
// Metric, no good index used flag
|
||||
noGoodIndexUsed uint8
|
||||
}
|
||||
|
||||
const (
|
||||
// Maximum allowed number of elements in table events_statements_history.
|
||||
// TODO: make it configurable?
|
||||
stmtsHistoryElemMax uint64 = 1024
|
||||
)
|
||||
|
||||
var (
|
||||
stmtInfos = make(map[reflect.Type]*statementInfo)
|
||||
)
|
||||
|
||||
func (ps *perfSchema) RegisterStatement(category, name string, elem interface{}) {
|
||||
instrumentName := fmt.Sprintf("%s%s/%s", statementInstrumentPrefix, category, name)
|
||||
key, err := ps.addInstrument(instrumentName)
|
||||
if err != nil {
|
||||
// just ignore, do nothing else.
|
||||
log.Errorf("Unable to register instrument %s", instrumentName)
|
||||
return
|
||||
}
|
||||
|
||||
stmtInfos[reflect.TypeOf(elem)] = &statementInfo{
|
||||
key: key,
|
||||
name: instrumentName,
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *perfSchema) StartStatement(sql string, connID uint64, callerName EnumCallerName, elem interface{}) *StatementState {
|
||||
stmtType := reflect.TypeOf(elem)
|
||||
info, ok := stmtInfos[stmtType]
|
||||
if !ok {
|
||||
// just ignore, do nothing else.
|
||||
log.Errorf("No instrument registered for statement %s", stmtType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// check and apply the configuration parameter in table setup_timers.
|
||||
timerName, err := ps.getTimerName(flagStatement)
|
||||
if err != nil {
|
||||
// just ignore, do nothing else.
|
||||
log.Error("Unable to check setup_timers table")
|
||||
return nil
|
||||
}
|
||||
var timerStart int64
|
||||
switch timerName {
|
||||
case timerNameNanosec:
|
||||
timerStart = time.Now().UnixNano()
|
||||
case timerNameMicrosec:
|
||||
timerStart = time.Now().UnixNano() / int64(time.Microsecond)
|
||||
case timerNameMillisec:
|
||||
timerStart = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: check and apply the additional configuration parameters in:
|
||||
// - table setup_actors
|
||||
// - table setup_setup_consumers
|
||||
// - table setup_instruments
|
||||
// - table setup_objects
|
||||
|
||||
var source string
|
||||
source, ok = callerNames[callerName]
|
||||
if !ok {
|
||||
_, fileName, fileLine, ok := runtime.Caller(1)
|
||||
if !ok {
|
||||
// just ignore, do nothing else.
|
||||
log.Error("Unable to get runtime.Caller(1)")
|
||||
return nil
|
||||
}
|
||||
source = fmt.Sprintf("%s:%d", fileName, fileLine)
|
||||
callerNames[callerName] = source
|
||||
}
|
||||
|
||||
return &StatementState{
|
||||
connID: connID,
|
||||
info: info,
|
||||
stmtType: stmtType,
|
||||
source: source,
|
||||
timerName: timerName,
|
||||
timerStart: timerStart,
|
||||
sqlText: sql,
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *perfSchema) EndStatement(state *StatementState) {
|
||||
if state == nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch state.timerName {
|
||||
case timerNameNanosec:
|
||||
state.timerEnd = time.Now().UnixNano()
|
||||
case timerNameMicrosec:
|
||||
state.timerEnd = time.Now().UnixNano() / int64(time.Microsecond)
|
||||
case timerNameMillisec:
|
||||
state.timerEnd = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("EndStatement: sql %s, connection id %d, type %s", state.sqlText, state.connID, state.stmtType)
|
||||
|
||||
record := state2Record(state)
|
||||
err := ps.updateEventsStmtsCurrent(state.connID, record)
|
||||
if err != nil {
|
||||
log.Error("Unable to update events_statements_current table")
|
||||
}
|
||||
err = ps.appendEventsStmtsHistory(record)
|
||||
if err != nil {
|
||||
log.Error("Unable to append to events_statements_history table")
|
||||
}
|
||||
}
|
||||
|
||||
func state2Record(state *StatementState) []interface{} {
|
||||
return []interface{}{
|
||||
state.connID, // THREAD_ID
|
||||
state.info.key, // EVENT_ID
|
||||
nil, // END_EVENT_ID
|
||||
state.info.name, // EVENT_NAME
|
||||
state.source, // SOURCE
|
||||
uint64(state.timerStart), // TIMER_START
|
||||
uint64(state.timerEnd), // TIMER_END
|
||||
nil, // TIMER_WAIT
|
||||
uint64(state.lockTime), // LOCK_TIME
|
||||
state.sqlText, // SQL_TEXT
|
||||
nil, // DIGEST
|
||||
nil, // DIGEST_TEXT
|
||||
state.schemaName, // CURRENT_SCHEMA
|
||||
nil, // OBJECT_TYPE
|
||||
nil, // OBJECT_SCHEMA
|
||||
nil, // OBJECT_NAME
|
||||
nil, // OBJECT_INSTANCE_BEGIN
|
||||
nil, // MYSQL_ERRNO,
|
||||
nil, // RETURNED_SQLSTATE
|
||||
nil, // MESSAGE_TEXT
|
||||
uint64(state.errNum), // ERRORS
|
||||
uint64(state.warnNum), // WARNINGS
|
||||
state.rowsAffected, // ROWS_AFFECTED
|
||||
state.rowsSent, // ROWS_SENT
|
||||
state.rowsExamined, // ROWS_EXAMINED
|
||||
uint64(state.createdTmpDiskTables), // CREATED_TMP_DISK_TABLES
|
||||
uint64(state.createdTmpTables), // CREATED_TMP_TABLES
|
||||
uint64(state.selectFullJoin), // SELECT_FULL_JOIN
|
||||
uint64(state.selectFullRangeJoin), // SELECT_FULL_RANGE_JOIN
|
||||
uint64(state.selectRange), // SELECT_RANGE
|
||||
uint64(state.selectRangeCheck), // SELECT_RANGE_CHECK
|
||||
uint64(state.selectScan), // SELECT_SCAN
|
||||
uint64(state.sortMergePasses), // SORT_MERGE_PASSES
|
||||
uint64(state.sortRange), // SORT_RANGE
|
||||
uint64(state.sortRows), // SORT_ROWS
|
||||
uint64(state.sortScan), // SORT_SCAN
|
||||
uint64(state.noIndexUsed), // NO_INDEX_USED
|
||||
uint64(state.noGoodIndexUsed), // NO_GOOD_INDEX_USED
|
||||
nil, // NESTING_EVENT_ID
|
||||
nil, // NESTING_EVENT_TYPE
|
||||
nil, // NESTING_EVENT_LEVEL
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *perfSchema) updateEventsStmtsCurrent(connID uint64, record []interface{}) error {
|
||||
store := ps.stores[TableStmtsCurrent]
|
||||
|
||||
batch := pool.Get().(*leveldb.Batch)
|
||||
defer func() {
|
||||
batch.Reset()
|
||||
pool.Put(batch)
|
||||
}()
|
||||
|
||||
rawKey := []interface{}{connID}
|
||||
key, err := codec.EncodeKey(nil, rawKey...)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
val, err := codec.EncodeValue(nil, record...)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
batch.Put(key, val)
|
||||
|
||||
err = store.Write(batch, nil)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (ps *perfSchema) appendEventsStmtsHistory(record []interface{}) error {
|
||||
store := ps.stores[TableStmtsHistory]
|
||||
lastLsn := atomic.AddUint64(ps.lsns[TableStmtsHistory], 1)
|
||||
|
||||
batch := pool.Get().(*leveldb.Batch)
|
||||
defer func() {
|
||||
batch.Reset()
|
||||
pool.Put(batch)
|
||||
}()
|
||||
|
||||
lsn := lastLsn - 1
|
||||
rawKey := []interface{}{uint64(lsn)}
|
||||
key, err := codec.EncodeKey(nil, rawKey...)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
val, err := codec.EncodeValue(nil, record...)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
batch.Put(key, val)
|
||||
|
||||
// Ensure no more than stmtsHistoryElemMax elements in this table.
|
||||
if lastLsn > stmtsHistoryElemMax {
|
||||
rawKey = []interface{}{uint64(lastLsn - stmtsHistoryElemMax)}
|
||||
key, err = codec.EncodeKey(nil, rawKey...)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
batch.Delete(key)
|
||||
}
|
||||
|
||||
err = store.Write(batch, nil)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Existing instrument names are the same as MySQL 5.7
|
||||
PerfHandle.RegisterStatement("sql", "create_db", (*ast.CreateDatabaseStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "drop_db", (*ast.DropDatabaseStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "create_table", (*ast.CreateTableStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "drop_table", (*ast.DropTableStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "create_index", (*ast.CreateIndexStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "drop_index", (*ast.DropIndexStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "truncate", (*ast.TruncateTableStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "select", (*ast.SelectStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "union", (*ast.UnionStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "insert", (*ast.InsertStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "delete", (*ast.DeleteStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "update", (*ast.UpdateStmt)(nil))
|
||||
PerfHandle.RegisterStatement("sql", "show", (*ast.ShowStmt)(nil))
|
||||
}
|
||||
@ -119,7 +119,10 @@ func (s *testTableRsetSuite) TestTableSourceString(c *C) {
|
||||
se := newSession(c, store, s.dbName)
|
||||
ctx, ok := se.(context.Context)
|
||||
c.Assert(ok, IsTrue)
|
||||
stmtList, err := tidb.Compile(ctx, s.querySql)
|
||||
rawStmtList, err := tidb.Parse(ctx, s.querySql)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(rawStmtList), Greater, 0)
|
||||
stmtList, err := tidb.Compile(ctx, rawStmtList)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(stmtList), Greater, 0)
|
||||
|
||||
|
||||
22
session.go
22
session.go
@ -34,6 +34,7 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/meta"
|
||||
"github.com/pingcap/tidb/mysql"
|
||||
"github.com/pingcap/tidb/perfschema"
|
||||
"github.com/pingcap/tidb/privilege"
|
||||
"github.com/pingcap/tidb/privilege/privileges"
|
||||
"github.com/pingcap/tidb/rset"
|
||||
@ -122,6 +123,9 @@ type session struct {
|
||||
maxRetryCnt int // Max retry times. If maxRetryCnt <=0, there is no limitation for retry times.
|
||||
|
||||
debugInfos map[string]interface{} // Vars for debug and unit tests.
|
||||
|
||||
// For performance_schema only.
|
||||
stmtState *perfschema.StatementState
|
||||
}
|
||||
|
||||
func (s *session) Status() uint16 {
|
||||
@ -254,7 +258,11 @@ func (s *session) ExecRestrictedSQL(ctx context.Context, sql string) (rset.Recor
|
||||
// TODO: Maybe we should remove this restriction latter.
|
||||
return nil, errors.New("Should not call ExecRestrictedSQL concurrently.")
|
||||
}
|
||||
statements, err := Compile(ctx, sql)
|
||||
rawStmts, err := Parse(ctx, sql)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
statements, err := Compile(ctx, rawStmts)
|
||||
if err != nil {
|
||||
log.Errorf("Compile %s with error: %v", sql, err)
|
||||
return nil, errors.Trace(err)
|
||||
@ -364,7 +372,11 @@ func (s *session) ShouldAutocommit(ctx context.Context) bool {
|
||||
}
|
||||
|
||||
func (s *session) Execute(sql string) ([]rset.Recordset, error) {
|
||||
statements, err := Compile(s, sql)
|
||||
rawStmts, err := Parse(s, sql)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
statements, err := Compile(s, rawStmts)
|
||||
if err != nil {
|
||||
log.Errorf("Syntax error: %s", sql)
|
||||
log.Errorf("Error occurs at %s.", err)
|
||||
@ -372,9 +384,11 @@ func (s *session) Execute(sql string) ([]rset.Recordset, error) {
|
||||
}
|
||||
|
||||
var rs []rset.Recordset
|
||||
|
||||
for _, st := range statements {
|
||||
for i, st := range statements {
|
||||
id := variable.GetSessionVars(s).ConnectionID
|
||||
s.stmtState = perfschema.PerfHandle.StartStatement(sql, id, perfschema.CallerNameSessionExecute, rawStmts[i])
|
||||
r, err := runStmt(s, st)
|
||||
perfschema.PerfHandle.EndStatement(s.stmtState)
|
||||
if err != nil {
|
||||
log.Warnf("session:%v, err:%v", s, err)
|
||||
return nil, errors.Trace(err)
|
||||
|
||||
@ -24,7 +24,10 @@ import (
|
||||
func (s *testStmtSuite) TestExplain(c *C) {
|
||||
testSQL := "explain select 1"
|
||||
|
||||
stmtList, err := tidb.Compile(s.ctx, testSQL)
|
||||
rawStmtList, err := tidb.Parse(s.ctx, testSQL)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(rawStmtList, HasLen, 1)
|
||||
stmtList, err := tidb.Compile(s.ctx, rawStmtList)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(stmtList, HasLen, 1)
|
||||
|
||||
@ -48,7 +51,10 @@ func (s *testStmtSuite) TestExplain(c *C) {
|
||||
|
||||
showColumnSQL := "desc t;"
|
||||
|
||||
stmtList, err = tidb.Compile(s.ctx, showColumnSQL)
|
||||
rawStmtList, err = tidb.Parse(s.ctx, showColumnSQL)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(rawStmtList, HasLen, 1)
|
||||
stmtList, err = tidb.Compile(s.ctx, rawStmtList)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(stmtList, HasLen, 1)
|
||||
|
||||
|
||||
8
tidb.go
8
tidb.go
@ -127,6 +127,7 @@ func getCtxCharsetInfo(ctx context.Context) (string, string) {
|
||||
|
||||
// Parse parses a query string to raw ast.StmtNode.
|
||||
func Parse(ctx context.Context, src string) ([]ast.StmtNode, error) {
|
||||
log.Debug("compiling", src)
|
||||
charset, collation := getCtxCharsetInfo(ctx)
|
||||
stmts, err := parser.Parse(src, charset, collation)
|
||||
if err != nil {
|
||||
@ -137,12 +138,7 @@ func Parse(ctx context.Context, src string) ([]ast.StmtNode, error) {
|
||||
}
|
||||
|
||||
// Compile is safe for concurrent use by multiple goroutines.
|
||||
func Compile(ctx context.Context, src string) ([]stmt.Statement, error) {
|
||||
log.Debug("compiling", src)
|
||||
rawStmt, err := Parse(ctx, src)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
func Compile(ctx context.Context, rawStmt []ast.StmtNode) ([]stmt.Statement, error) {
|
||||
stmts := make([]stmt.Statement, len(rawStmt))
|
||||
for i, v := range rawStmt {
|
||||
compiler := &executor.Compiler{}
|
||||
|
||||
Reference in New Issue
Block a user