From b16875f25088ff27aa9c1be51e022851aa237b58 Mon Sep 17 00:00:00 2001 From: nieyy Date: Tue, 23 Feb 2016 20:38:44 +0800 Subject: [PATCH] performance_schema: Add initial support for statement instruments --- perfschema/common.go | 12 +- perfschema/instrument.go | 124 +++++++++++++ perfschema/perfschema.go | 12 ++ perfschema/perfschema_test.go | 15 ++ perfschema/statement.go | 334 ++++++++++++++++++++++++++++++++++ rset/rsets/from_test.go | 5 +- session.go | 22 ++- stmt/stmts/explain_test.go | 10 +- tidb.go | 8 +- 9 files changed, 528 insertions(+), 14 deletions(-) create mode 100644 perfschema/instrument.go create mode 100644 perfschema/statement.go diff --git a/perfschema/common.go b/perfschema/common.go index 63e5684fc5..523c6e560b 100644 --- a/perfschema/common.go +++ b/perfschema/common.go @@ -65,9 +65,19 @@ func decodeValue(data []byte, cols []*model.ColumnInfo) ([]interface{}, error) { var rvalues []interface{} for i, col := range cols { + if values[i] == nil { + rvalues = append(rvalues, nil) + continue + } // TODO: support more types if we really need. switch col.Tp { - case mysql.TypeString, mysql.TypeVarchar: + case mysql.TypeLong: + val := values[i].(int64) + rvalues = append(rvalues, val) + case mysql.TypeLonglong: + val := values[i].(uint64) + rvalues = append(rvalues, val) + case mysql.TypeString, mysql.TypeVarchar, mysql.TypeLongBlob: val := string(values[i].([]byte)) rvalues = append(rvalues, val) case mysql.TypeEnum: diff --git a/perfschema/instrument.go b/perfschema/instrument.go new file mode 100644 index 0000000000..903ca2590f --- /dev/null +++ b/perfschema/instrument.go @@ -0,0 +1,124 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package perfschema + +import ( + "sync/atomic" + + "github.com/juju/errors" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/util/codec" + "github.com/syndtr/goleveldb/leveldb" +) + +// EnumCallerName is used as a parameter to avoid calling runtime.Caller(1) since +// it is too expensive (500ns+ per call), we don't want to invoke it repeatedly for +// each instrument. +type EnumCallerName int + +const ( + // CallerNameSessionExecute is for session.go:Execute() method. + CallerNameSessionExecute EnumCallerName = iota + 1 +) + +const ( + stageInstrumentPrefix = "stage/" + statementInstrumentPrefix = "statement/" + transactionInstrumentPrefix = "transaction" +) + +// Flag indicators for table setup_timers. +const ( + flagStage = iota + 1 + flagStatement + flagTransaction +) + +type enumTimerName int + +// Enum values for the TIMER_NAME columns. +// This enum is found in the following tables: +// - performance_schema.setup_timer (TIMER_NAME) +const ( + timerNameNone enumTimerName = iota + timerNameNanosec + timerNameMicrosec + timerNameMillisec +) + +var ( + callerNames = make(map[EnumCallerName]string) +) + +// addInstrument is used to add an item to setup_instruments table. +func (ps *perfSchema) addInstrument(name string) (uint64, error) { + store := ps.stores[TableSetupInstruments] + lastLsn := atomic.AddUint64(ps.lsns[TableSetupInstruments], 1) + + batch := pool.Get().(*leveldb.Batch) + defer func() { + batch.Reset() + pool.Put(batch) + }() + + record := []interface{}{name, mysql.Enum{Name: "YES", Value: 1}, mysql.Enum{Name: "YES", Value: 1}} + lsn := lastLsn - 1 + rawKey := []interface{}{uint64(lsn)} + key, err := codec.EncodeKey(nil, rawKey...) + if err != nil { + return 0, errors.Trace(err) + } + val, err := codec.EncodeValue(nil, record...) + if err != nil { + return 0, errors.Trace(err) + } + batch.Put(key, val) + + err = store.Write(batch, nil) + return lsn, errors.Trace(err) +} + +func (ps *perfSchema) getTimerName(flag int) (enumTimerName, error) { + store := ps.stores[TableSetupTimers] + + rawKey := []interface{}{uint64(flag)} + key, err := codec.EncodeKey(nil, rawKey...) + if err != nil { + return timerNameNone, errors.Trace(err) + } + + val, err := store.Get(key, nil) + if err != nil { + return timerNameNone, errors.Trace(err) + } + + record, err := decodeValue(val, ps.tables[TableSetupTimers].Columns) + if err != nil { + return timerNameNone, errors.Trace(err) + } + + timerName, ok := record[1].(string) + if !ok { + return timerNameNone, errors.New("Timer type does not match") + } + switch timerName { + case "NANOSECOND": + return timerNameNanosec, nil + case "MICROSECOND": + return timerNameMicrosec, nil + case "MILLISECOND": + return timerNameMillisec, nil + } + return timerNameNone, nil +} diff --git a/perfschema/perfschema.go b/perfschema/perfschema.go index 49046d6f07..dabd0bed72 100644 --- a/perfschema/perfschema.go +++ b/perfschema/perfschema.go @@ -22,6 +22,15 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +// StatementInstrument defines the methods for statement instrumentation points +type StatementInstrument interface { + RegisterStatement(category, name string, elem interface{}) + + StartStatement(sql string, connID uint64, callerName EnumCallerName, elem interface{}) *StatementState + + EndStatement(state *StatementState) +} + // PerfSchema defines the methods to be invoked by the executor type PerfSchema interface { // For SELECT statement only. @@ -29,6 +38,9 @@ type PerfSchema interface { // For INSERT statement only. ExecInsert(insertVals *InsertValues) error + + // For statement instrumentation only. + StatementInstrument } type perfSchema struct { diff --git a/perfschema/perfschema_test.go b/perfschema/perfschema_test.go index 5bba50bdbf..852f310d23 100644 --- a/perfschema/perfschema_test.go +++ b/perfschema/perfschema_test.go @@ -171,3 +171,18 @@ func (p *testPerfSchemaSuit) TestInsert(c *C) { mustFailExec(c, testDB, `insert into performance_schema.setup_consumers values("events_stages_current", "YES");`) mustFailExec(c, testDB, `insert into performance_schema.setup_timers values("timer1", "NANOSECOND");`) } + +func (p *testPerfSchemaSuit) TestInstrument(c *C) { + testDB, err := sql.Open(tidb.DriverName, tidb.EngineGoLevelDBMemory+"/test/test") + c.Assert(err, IsNil) + defer testDB.Close() + + cnt := mustQuery(c, testDB, "select * from performance_schema.setup_instruments") + c.Assert(cnt, Greater, 0) + + mustExec(c, testDB, "show tables") + cnt = mustQuery(c, testDB, "select * from performance_schema.events_statements_current") + c.Assert(cnt, Equals, 1) + cnt = mustQuery(c, testDB, "select * from performance_schema.events_statements_history") + c.Assert(cnt, Greater, 0) +} diff --git a/perfschema/statement.go b/perfschema/statement.go new file mode 100644 index 0000000000..77a8bbe9bc --- /dev/null +++ b/perfschema/statement.go @@ -0,0 +1,334 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package perfschema + +import ( + "fmt" + "reflect" + "runtime" + "sync/atomic" + "time" + + "github.com/juju/errors" + "github.com/ngaut/log" + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/util/codec" + "github.com/syndtr/goleveldb/leveldb" +) + +// statementInfo defines statement instrument information. +type statementInfo struct { + // The registered statement key + key uint64 + // The name of the statement instrument to register + name string +} + +// StatementState provides temporary storage to a statement runtime statistics. +// TODO: +// 1. support statement digest. +// 2. support prepared statement. +type StatementState struct { + // Connection identifier + connID uint64 + // Statement information + info *statementInfo + // Statement type + stmtType reflect.Type + // Source file and line number + source string + // Timer name + timerName enumTimerName + // Timer start + timerStart int64 + // Timer end + timerEnd int64 + // Locked time + lockTime int64 + // SQL statement string + sqlText string + // Current schema name + schemaName string + // Number of errors + errNum uint32 + // Number of warnings + warnNum uint32 + // Rows affected + rowsAffected uint64 + // Rows sent + rowsSent uint64 + // Rows examined + rowsExamined uint64 + // Metric, temporary tables created on disk + createdTmpDiskTables uint32 + // Metric, temproray tables created + createdTmpTables uint32 + // Metric, number of select full join + selectFullJoin uint32 + // Metric, number of select full range join + selectFullRangeJoin uint32 + // Metric, number of select range + selectRange uint32 + // Metric, number of select range check + selectRangeCheck uint32 + // Metric, number of select scan + selectScan uint32 + // Metric, number of sort merge passes + sortMergePasses uint32 + // Metric, number of sort merge + sortRange uint32 + // Metric, number of sort rows + sortRows uint32 + // Metric, number of sort scans + sortScan uint32 + // Metric, no index used flag + noIndexUsed uint8 + // Metric, no good index used flag + noGoodIndexUsed uint8 +} + +const ( + // Maximum allowed number of elements in table events_statements_history. + // TODO: make it configurable? + stmtsHistoryElemMax uint64 = 1024 +) + +var ( + stmtInfos = make(map[reflect.Type]*statementInfo) +) + +func (ps *perfSchema) RegisterStatement(category, name string, elem interface{}) { + instrumentName := fmt.Sprintf("%s%s/%s", statementInstrumentPrefix, category, name) + key, err := ps.addInstrument(instrumentName) + if err != nil { + // just ignore, do nothing else. + log.Errorf("Unable to register instrument %s", instrumentName) + return + } + + stmtInfos[reflect.TypeOf(elem)] = &statementInfo{ + key: key, + name: instrumentName, + } +} + +func (ps *perfSchema) StartStatement(sql string, connID uint64, callerName EnumCallerName, elem interface{}) *StatementState { + stmtType := reflect.TypeOf(elem) + info, ok := stmtInfos[stmtType] + if !ok { + // just ignore, do nothing else. + log.Errorf("No instrument registered for statement %s", stmtType) + return nil + } + + // check and apply the configuration parameter in table setup_timers. + timerName, err := ps.getTimerName(flagStatement) + if err != nil { + // just ignore, do nothing else. + log.Error("Unable to check setup_timers table") + return nil + } + var timerStart int64 + switch timerName { + case timerNameNanosec: + timerStart = time.Now().UnixNano() + case timerNameMicrosec: + timerStart = time.Now().UnixNano() / int64(time.Microsecond) + case timerNameMillisec: + timerStart = time.Now().UnixNano() / int64(time.Millisecond) + default: + return nil + } + + // TODO: check and apply the additional configuration parameters in: + // - table setup_actors + // - table setup_setup_consumers + // - table setup_instruments + // - table setup_objects + + var source string + source, ok = callerNames[callerName] + if !ok { + _, fileName, fileLine, ok := runtime.Caller(1) + if !ok { + // just ignore, do nothing else. + log.Error("Unable to get runtime.Caller(1)") + return nil + } + source = fmt.Sprintf("%s:%d", fileName, fileLine) + callerNames[callerName] = source + } + + return &StatementState{ + connID: connID, + info: info, + stmtType: stmtType, + source: source, + timerName: timerName, + timerStart: timerStart, + sqlText: sql, + } +} + +func (ps *perfSchema) EndStatement(state *StatementState) { + if state == nil { + return + } + + switch state.timerName { + case timerNameNanosec: + state.timerEnd = time.Now().UnixNano() + case timerNameMicrosec: + state.timerEnd = time.Now().UnixNano() / int64(time.Microsecond) + case timerNameMillisec: + state.timerEnd = time.Now().UnixNano() / int64(time.Millisecond) + default: + return + } + + log.Debugf("EndStatement: sql %s, connection id %d, type %s", state.sqlText, state.connID, state.stmtType) + + record := state2Record(state) + err := ps.updateEventsStmtsCurrent(state.connID, record) + if err != nil { + log.Error("Unable to update events_statements_current table") + } + err = ps.appendEventsStmtsHistory(record) + if err != nil { + log.Error("Unable to append to events_statements_history table") + } +} + +func state2Record(state *StatementState) []interface{} { + return []interface{}{ + state.connID, // THREAD_ID + state.info.key, // EVENT_ID + nil, // END_EVENT_ID + state.info.name, // EVENT_NAME + state.source, // SOURCE + uint64(state.timerStart), // TIMER_START + uint64(state.timerEnd), // TIMER_END + nil, // TIMER_WAIT + uint64(state.lockTime), // LOCK_TIME + state.sqlText, // SQL_TEXT + nil, // DIGEST + nil, // DIGEST_TEXT + state.schemaName, // CURRENT_SCHEMA + nil, // OBJECT_TYPE + nil, // OBJECT_SCHEMA + nil, // OBJECT_NAME + nil, // OBJECT_INSTANCE_BEGIN + nil, // MYSQL_ERRNO, + nil, // RETURNED_SQLSTATE + nil, // MESSAGE_TEXT + uint64(state.errNum), // ERRORS + uint64(state.warnNum), // WARNINGS + state.rowsAffected, // ROWS_AFFECTED + state.rowsSent, // ROWS_SENT + state.rowsExamined, // ROWS_EXAMINED + uint64(state.createdTmpDiskTables), // CREATED_TMP_DISK_TABLES + uint64(state.createdTmpTables), // CREATED_TMP_TABLES + uint64(state.selectFullJoin), // SELECT_FULL_JOIN + uint64(state.selectFullRangeJoin), // SELECT_FULL_RANGE_JOIN + uint64(state.selectRange), // SELECT_RANGE + uint64(state.selectRangeCheck), // SELECT_RANGE_CHECK + uint64(state.selectScan), // SELECT_SCAN + uint64(state.sortMergePasses), // SORT_MERGE_PASSES + uint64(state.sortRange), // SORT_RANGE + uint64(state.sortRows), // SORT_ROWS + uint64(state.sortScan), // SORT_SCAN + uint64(state.noIndexUsed), // NO_INDEX_USED + uint64(state.noGoodIndexUsed), // NO_GOOD_INDEX_USED + nil, // NESTING_EVENT_ID + nil, // NESTING_EVENT_TYPE + nil, // NESTING_EVENT_LEVEL + } +} + +func (ps *perfSchema) updateEventsStmtsCurrent(connID uint64, record []interface{}) error { + store := ps.stores[TableStmtsCurrent] + + batch := pool.Get().(*leveldb.Batch) + defer func() { + batch.Reset() + pool.Put(batch) + }() + + rawKey := []interface{}{connID} + key, err := codec.EncodeKey(nil, rawKey...) + if err != nil { + return errors.Trace(err) + } + val, err := codec.EncodeValue(nil, record...) + if err != nil { + return errors.Trace(err) + } + batch.Put(key, val) + + err = store.Write(batch, nil) + return errors.Trace(err) +} + +func (ps *perfSchema) appendEventsStmtsHistory(record []interface{}) error { + store := ps.stores[TableStmtsHistory] + lastLsn := atomic.AddUint64(ps.lsns[TableStmtsHistory], 1) + + batch := pool.Get().(*leveldb.Batch) + defer func() { + batch.Reset() + pool.Put(batch) + }() + + lsn := lastLsn - 1 + rawKey := []interface{}{uint64(lsn)} + key, err := codec.EncodeKey(nil, rawKey...) + if err != nil { + return errors.Trace(err) + } + val, err := codec.EncodeValue(nil, record...) + if err != nil { + return errors.Trace(err) + } + batch.Put(key, val) + + // Ensure no more than stmtsHistoryElemMax elements in this table. + if lastLsn > stmtsHistoryElemMax { + rawKey = []interface{}{uint64(lastLsn - stmtsHistoryElemMax)} + key, err = codec.EncodeKey(nil, rawKey...) + if err != nil { + return errors.Trace(err) + } + batch.Delete(key) + } + + err = store.Write(batch, nil) + return errors.Trace(err) +} + +func init() { + // Existing instrument names are the same as MySQL 5.7 + PerfHandle.RegisterStatement("sql", "create_db", (*ast.CreateDatabaseStmt)(nil)) + PerfHandle.RegisterStatement("sql", "drop_db", (*ast.DropDatabaseStmt)(nil)) + PerfHandle.RegisterStatement("sql", "create_table", (*ast.CreateTableStmt)(nil)) + PerfHandle.RegisterStatement("sql", "drop_table", (*ast.DropTableStmt)(nil)) + PerfHandle.RegisterStatement("sql", "create_index", (*ast.CreateIndexStmt)(nil)) + PerfHandle.RegisterStatement("sql", "drop_index", (*ast.DropIndexStmt)(nil)) + PerfHandle.RegisterStatement("sql", "truncate", (*ast.TruncateTableStmt)(nil)) + PerfHandle.RegisterStatement("sql", "select", (*ast.SelectStmt)(nil)) + PerfHandle.RegisterStatement("sql", "union", (*ast.UnionStmt)(nil)) + PerfHandle.RegisterStatement("sql", "insert", (*ast.InsertStmt)(nil)) + PerfHandle.RegisterStatement("sql", "delete", (*ast.DeleteStmt)(nil)) + PerfHandle.RegisterStatement("sql", "update", (*ast.UpdateStmt)(nil)) + PerfHandle.RegisterStatement("sql", "show", (*ast.ShowStmt)(nil)) +} diff --git a/rset/rsets/from_test.go b/rset/rsets/from_test.go index abbcfceb22..8f8dd0c793 100644 --- a/rset/rsets/from_test.go +++ b/rset/rsets/from_test.go @@ -119,7 +119,10 @@ func (s *testTableRsetSuite) TestTableSourceString(c *C) { se := newSession(c, store, s.dbName) ctx, ok := se.(context.Context) c.Assert(ok, IsTrue) - stmtList, err := tidb.Compile(ctx, s.querySql) + rawStmtList, err := tidb.Parse(ctx, s.querySql) + c.Assert(err, IsNil) + c.Assert(len(rawStmtList), Greater, 0) + stmtList, err := tidb.Compile(ctx, rawStmtList) c.Assert(err, IsNil) c.Assert(len(stmtList), Greater, 0) diff --git a/session.go b/session.go index 65401b1e63..7b097465e1 100644 --- a/session.go +++ b/session.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/perfschema" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/rset" @@ -122,6 +123,9 @@ type session struct { maxRetryCnt int // Max retry times. If maxRetryCnt <=0, there is no limitation for retry times. debugInfos map[string]interface{} // Vars for debug and unit tests. + + // For performance_schema only. + stmtState *perfschema.StatementState } func (s *session) Status() uint16 { @@ -254,7 +258,11 @@ func (s *session) ExecRestrictedSQL(ctx context.Context, sql string) (rset.Recor // TODO: Maybe we should remove this restriction latter. return nil, errors.New("Should not call ExecRestrictedSQL concurrently.") } - statements, err := Compile(ctx, sql) + rawStmts, err := Parse(ctx, sql) + if err != nil { + return nil, errors.Trace(err) + } + statements, err := Compile(ctx, rawStmts) if err != nil { log.Errorf("Compile %s with error: %v", sql, err) return nil, errors.Trace(err) @@ -364,7 +372,11 @@ func (s *session) ShouldAutocommit(ctx context.Context) bool { } func (s *session) Execute(sql string) ([]rset.Recordset, error) { - statements, err := Compile(s, sql) + rawStmts, err := Parse(s, sql) + if err != nil { + return nil, errors.Trace(err) + } + statements, err := Compile(s, rawStmts) if err != nil { log.Errorf("Syntax error: %s", sql) log.Errorf("Error occurs at %s.", err) @@ -372,9 +384,11 @@ func (s *session) Execute(sql string) ([]rset.Recordset, error) { } var rs []rset.Recordset - - for _, st := range statements { + for i, st := range statements { + id := variable.GetSessionVars(s).ConnectionID + s.stmtState = perfschema.PerfHandle.StartStatement(sql, id, perfschema.CallerNameSessionExecute, rawStmts[i]) r, err := runStmt(s, st) + perfschema.PerfHandle.EndStatement(s.stmtState) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) diff --git a/stmt/stmts/explain_test.go b/stmt/stmts/explain_test.go index 3d0454286e..919c0384be 100644 --- a/stmt/stmts/explain_test.go +++ b/stmt/stmts/explain_test.go @@ -24,7 +24,10 @@ import ( func (s *testStmtSuite) TestExplain(c *C) { testSQL := "explain select 1" - stmtList, err := tidb.Compile(s.ctx, testSQL) + rawStmtList, err := tidb.Parse(s.ctx, testSQL) + c.Assert(err, IsNil) + c.Assert(rawStmtList, HasLen, 1) + stmtList, err := tidb.Compile(s.ctx, rawStmtList) c.Assert(err, IsNil) c.Assert(stmtList, HasLen, 1) @@ -48,7 +51,10 @@ func (s *testStmtSuite) TestExplain(c *C) { showColumnSQL := "desc t;" - stmtList, err = tidb.Compile(s.ctx, showColumnSQL) + rawStmtList, err = tidb.Parse(s.ctx, showColumnSQL) + c.Assert(err, IsNil) + c.Assert(rawStmtList, HasLen, 1) + stmtList, err = tidb.Compile(s.ctx, rawStmtList) c.Assert(err, IsNil) c.Assert(stmtList, HasLen, 1) diff --git a/tidb.go b/tidb.go index dec3b2815c..0bfefbbd5b 100644 --- a/tidb.go +++ b/tidb.go @@ -127,6 +127,7 @@ func getCtxCharsetInfo(ctx context.Context) (string, string) { // Parse parses a query string to raw ast.StmtNode. func Parse(ctx context.Context, src string) ([]ast.StmtNode, error) { + log.Debug("compiling", src) charset, collation := getCtxCharsetInfo(ctx) stmts, err := parser.Parse(src, charset, collation) if err != nil { @@ -137,12 +138,7 @@ func Parse(ctx context.Context, src string) ([]ast.StmtNode, error) { } // Compile is safe for concurrent use by multiple goroutines. -func Compile(ctx context.Context, src string) ([]stmt.Statement, error) { - log.Debug("compiling", src) - rawStmt, err := Parse(ctx, src) - if err != nil { - return nil, errors.Trace(err) - } +func Compile(ctx context.Context, rawStmt []ast.StmtNode) ([]stmt.Statement, error) { stmts := make([]stmt.Statement, len(rawStmt)) for i, v := range rawStmt { compiler := &executor.Compiler{}