From e7e31fae3fa9cc55edffaae09c5afdbd82bb535e Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 28 Nov 2018 20:37:50 +0800 Subject: [PATCH] *: add preparedStmt metric and add limit to max prepareStmt (#8405) --- executor/prepared.go | 5 ++- metrics/metrics.go | 1 + metrics/server.go | 7 ++++ planner/core/prepare_test.go | 61 ++++++++++++++++++++++++++++++++ session/session.go | 6 +++- sessionctx/variable/session.go | 44 +++++++++++++++++++++++ sessionctx/variable/sysvar.go | 55 +++++++++++++++------------- sessionctx/variable/tidb_vars.go | 1 + sessionctx/variable/varsutil.go | 2 ++ 9 files changed, 153 insertions(+), 29 deletions(-) diff --git a/executor/prepared.go b/executor/prepared.go index e7c5d3d340..d2cf1c6b9b 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -177,8 +177,7 @@ func (e *PrepareExec) Next(ctx context.Context, chk *chunk.Chunk) error { if e.name != "" { vars.PreparedStmtNameToID[e.name] = e.ID } - vars.PreparedStmts[e.ID] = prepared - return nil + return vars.AddPreparedStmt(e.ID, prepared) } // ExecuteExec represents an EXECUTE executor. @@ -242,7 +241,7 @@ func (e *DeallocateExec) Next(ctx context.Context, chk *chunk.Chunk) error { vars, id, vars.PreparedStmts[id].SchemaVersion, )) } - delete(vars.PreparedStmts, id) + vars.RemovePreparedStmt(id) return nil } diff --git a/metrics/metrics.go b/metrics/metrics.go index ef822d4b8a..604fe81463 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -56,6 +56,7 @@ func RegisterMetrics() { prometheus.MustRegister(BatchAddIdxHistogram) prometheus.MustRegister(CampaignOwnerCounter) prometheus.MustRegister(ConnGauge) + prometheus.MustRegister(PreparedStmtGauge) prometheus.MustRegister(CriticalErrorCounter) prometheus.MustRegister(DDLCounter) prometheus.MustRegister(DDLWorkerHistogram) diff --git a/metrics/server.go b/metrics/server.go index c8d5604a31..53ca6df120 100644 --- a/metrics/server.go +++ b/metrics/server.go @@ -48,6 +48,13 @@ var ( Help: "Number of connections.", }) + PreparedStmtGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "prepared_stmts", + Help: "number of prepared statements.", + }) + ExecuteErrorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "tidb", diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 929b3ea020..38f914870f 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -14,16 +14,20 @@ package core_test import ( + "strconv" "time" . "github.com/pingcap/check" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" ) @@ -217,3 +221,60 @@ func (s *testPrepareSuite) TestPrepareCacheNow(c *C) { rs = tk.MustQuery("execute stmt4").Rows() c.Assert(rs[0][0].(string), Equals, rs[0][2].(string)) } + +func (s *testPrepareSuite) TestPrepareOverMaxPreparedStmtCount(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + defer func() { + dom.Close() + store.Close() + }() + tk.MustExec("use test") + + // test prepare and deallocate. + prePrepared := readGaugeInt(metrics.PreparedStmtGauge) + tk.MustExec(`prepare stmt1 from "select 1"`) + onePrepared := readGaugeInt(metrics.PreparedStmtGauge) + c.Assert(prePrepared+1, Equals, onePrepared) + tk.MustExec(`deallocate prepare stmt1`) + deallocPrepared := readGaugeInt(metrics.PreparedStmtGauge) + c.Assert(prePrepared, Equals, deallocPrepared) + + // test change global limit and make it affected in test session. + tk.MustQuery("select @@max_prepared_stmt_count").Check(testkit.Rows("-1")) + tk.MustExec("set @@global.max_prepared_stmt_count = 2") + tk.MustQuery("select @@global.max_prepared_stmt_count").Check(testkit.Rows("2")) + time.Sleep(3 * time.Second) // renew a session after 2 sec + + // test close session to give up all prepared stmt + tk.MustExec(`prepare stmt2 from "select 1"`) + prePrepared = readGaugeInt(metrics.PreparedStmtGauge) + tk.Se.Close() + drawPrepared := readGaugeInt(metrics.PreparedStmtGauge) + c.Assert(prePrepared-1, Equals, drawPrepared) + + // test meet max limit. + tk.Se = nil + tk.MustQuery("select @@max_prepared_stmt_count").Check(testkit.Rows("2")) + for i := 1; ; i++ { + prePrepared = readGaugeInt(metrics.PreparedStmtGauge) + if prePrepared >= 2 { + _, err = tk.Exec(`prepare stmt` + strconv.Itoa(i) + ` from "select 1"`) + c.Assert(terror.ErrorEqual(err, variable.ErrMaxPreparedStmtCountReached), IsTrue) + break + } else { + tk.Exec(`prepare stmt` + strconv.Itoa(i) + ` from "select 1"`) + } + } +} + +func readGaugeInt(g prometheus.Gauge) int { + ch := make(chan prometheus.Metric, 1) + g.Collect(ch) + m := <-ch + mm := &dto.Metric{} + m.Write(mm) + return int(mm.GetGauge().GetValue()) +} diff --git a/session/session.go b/session/session.go index 7c1b94e652..67446ad60f 100644 --- a/session/session.go +++ b/session/session.go @@ -189,7 +189,7 @@ func (s *session) cleanRetryInfo() { } s.PreparedPlanCache().Delete(cacheKey) } - delete(s.sessionVars.PreparedStmts, stmtID) + s.sessionVars.RemovePreparedStmt(stmtID) } } @@ -1097,6 +1097,9 @@ func (s *session) Close() { if err := s.RollbackTxn(ctx); err != nil { log.Error("session Close error:", errors.ErrorStack(err)) } + if s.sessionVars != nil { + s.sessionVars.WithdrawAllPreparedStmt() + } } // GetSessionVars implements the context.Context interface. @@ -1381,6 +1384,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.TimeZone + quoteCommaQuote + variable.BlockEncryptionMode + quoteCommaQuote + variable.WaitTimeout + quoteCommaQuote + + variable.MaxPreparedStmtCount + quoteCommaQuote + /* TiDB specific global variables: */ variable.TiDBSkipUTF8Check + quoteCommaQuote + variable.TiDBIndexJoinBatchSize + quoteCommaQuote + diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c989c16500..4b413bec8d 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -16,6 +16,7 @@ package variable import ( "crypto/tls" "fmt" + "strconv" "strings" "sync" "sync/atomic" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -44,6 +46,8 @@ const ( codeSnapshotTooOld terror.ErrCode = 3 ) +var preparedStmtCount int64 + // Error instances. var ( errCantGetValidID = terror.ClassVariable.New(codeCantGetValidID, "cannot get valid auto-increment id in retry") @@ -531,6 +535,46 @@ func (s *SessionVars) setDDLReorgPriority(val string) { } } +// AddPreparedStmt adds prepareStmt to current session and count in global. +func (s *SessionVars) AddPreparedStmt(stmtID uint32, stmt *ast.Prepared) error { + if _, exists := s.PreparedStmts[stmtID]; !exists { + valStr, _ := s.GetSystemVar(MaxPreparedStmtCount) + maxPreparedStmtCount, err := strconv.ParseInt(valStr, 10, 64) + if err != nil { + maxPreparedStmtCount = DefMaxPreparedStmtCount + } + newPreparedStmtCount := atomic.AddInt64(&preparedStmtCount, 1) + if maxPreparedStmtCount >= 0 && newPreparedStmtCount > maxPreparedStmtCount { + atomic.AddInt64(&preparedStmtCount, -1) + return ErrMaxPreparedStmtCountReached.GenWithStackByArgs(maxPreparedStmtCount) + } + metrics.PreparedStmtGauge.Set(float64(newPreparedStmtCount)) + } + s.PreparedStmts[stmtID] = stmt + return nil +} + +// RemovePreparedStmt removes preparedStmt from current session and decrease count in global. +func (s *SessionVars) RemovePreparedStmt(stmtID uint32) { + _, exists := s.PreparedStmts[stmtID] + if !exists { + return + } + delete(s.PreparedStmts, stmtID) + afterMinus := atomic.AddInt64(&preparedStmtCount, -1) + metrics.PreparedStmtGauge.Set(float64(afterMinus)) +} + +// WithdrawAllPreparedStmt remove all preparedStmt in current session and decrease count in global. +func (s *SessionVars) WithdrawAllPreparedStmt() { + psCount := len(s.PreparedStmts) + if psCount == 0 { + return + } + afterMinus := atomic.AddInt64(&preparedStmtCount, -int64(psCount)) + metrics.PreparedStmtGauge.Set(float64(afterMinus)) +} + // SetSystemVar sets the value of a system variable. func (s *SessionVars) SetSystemVar(name string, val string) error { switch name { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index d94da6b5b3..7ac771a579 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -59,27 +59,29 @@ func GetSysVar(name string) *SysVar { // Variable error codes. const ( - CodeUnknownStatusVar terror.ErrCode = 1 - CodeUnknownSystemVar terror.ErrCode = mysql.ErrUnknownSystemVariable - CodeIncorrectScope terror.ErrCode = mysql.ErrIncorrectGlobalLocalVar - CodeUnknownTimeZone terror.ErrCode = mysql.ErrUnknownTimeZone - CodeReadOnly terror.ErrCode = mysql.ErrVariableIsReadonly - CodeWrongValueForVar terror.ErrCode = mysql.ErrWrongValueForVar - CodeWrongTypeForVar terror.ErrCode = mysql.ErrWrongTypeForVar - CodeTruncatedWrongValue terror.ErrCode = mysql.ErrTruncatedWrongValue + CodeUnknownStatusVar terror.ErrCode = 1 + CodeUnknownSystemVar terror.ErrCode = mysql.ErrUnknownSystemVariable + CodeIncorrectScope terror.ErrCode = mysql.ErrIncorrectGlobalLocalVar + CodeUnknownTimeZone terror.ErrCode = mysql.ErrUnknownTimeZone + CodeReadOnly terror.ErrCode = mysql.ErrVariableIsReadonly + CodeWrongValueForVar terror.ErrCode = mysql.ErrWrongValueForVar + CodeWrongTypeForVar terror.ErrCode = mysql.ErrWrongTypeForVar + CodeTruncatedWrongValue terror.ErrCode = mysql.ErrTruncatedWrongValue + CodeMaxPreparedStmtCountReached terror.ErrCode = mysql.ErrMaxPreparedStmtCountReached ) // Variable errors var ( - UnknownStatusVar = terror.ClassVariable.New(CodeUnknownStatusVar, "unknown status variable") - UnknownSystemVar = terror.ClassVariable.New(CodeUnknownSystemVar, mysql.MySQLErrName[mysql.ErrUnknownSystemVariable]) - ErrIncorrectScope = terror.ClassVariable.New(CodeIncorrectScope, mysql.MySQLErrName[mysql.ErrIncorrectGlobalLocalVar]) - ErrUnknownTimeZone = terror.ClassVariable.New(CodeUnknownTimeZone, mysql.MySQLErrName[mysql.ErrUnknownTimeZone]) - ErrReadOnly = terror.ClassVariable.New(CodeReadOnly, "variable is read only") - ErrWrongValueForVar = terror.ClassVariable.New(CodeWrongValueForVar, mysql.MySQLErrName[mysql.ErrWrongValueForVar]) - ErrWrongTypeForVar = terror.ClassVariable.New(CodeWrongTypeForVar, mysql.MySQLErrName[mysql.ErrWrongTypeForVar]) - ErrTruncatedWrongValue = terror.ClassVariable.New(CodeTruncatedWrongValue, mysql.MySQLErrName[mysql.ErrTruncatedWrongValue]) - ErrUnsupportedValueForVar = terror.ClassVariable.New(CodeUnknownStatusVar, "variable '%s' does not yet support value: %s") + UnknownStatusVar = terror.ClassVariable.New(CodeUnknownStatusVar, "unknown status variable") + UnknownSystemVar = terror.ClassVariable.New(CodeUnknownSystemVar, mysql.MySQLErrName[mysql.ErrUnknownSystemVariable]) + ErrIncorrectScope = terror.ClassVariable.New(CodeIncorrectScope, mysql.MySQLErrName[mysql.ErrIncorrectGlobalLocalVar]) + ErrUnknownTimeZone = terror.ClassVariable.New(CodeUnknownTimeZone, mysql.MySQLErrName[mysql.ErrUnknownTimeZone]) + ErrReadOnly = terror.ClassVariable.New(CodeReadOnly, "variable is read only") + ErrWrongValueForVar = terror.ClassVariable.New(CodeWrongValueForVar, mysql.MySQLErrName[mysql.ErrWrongValueForVar]) + ErrWrongTypeForVar = terror.ClassVariable.New(CodeWrongTypeForVar, mysql.MySQLErrName[mysql.ErrWrongTypeForVar]) + ErrTruncatedWrongValue = terror.ClassVariable.New(CodeTruncatedWrongValue, mysql.MySQLErrName[mysql.ErrTruncatedWrongValue]) + ErrMaxPreparedStmtCountReached = terror.ClassVariable.New(CodeMaxPreparedStmtCountReached, mysql.MySQLErrName[mysql.ErrMaxPreparedStmtCountReached]) + ErrUnsupportedValueForVar = terror.ClassVariable.New(CodeUnknownStatusVar, "variable '%s' does not yet support value: %s") ) func init() { @@ -91,13 +93,14 @@ func init() { // Register terror to mysql error map. mySQLErrCodes := map[terror.ErrCode]uint16{ - CodeUnknownSystemVar: mysql.ErrUnknownSystemVariable, - CodeIncorrectScope: mysql.ErrIncorrectGlobalLocalVar, - CodeUnknownTimeZone: mysql.ErrUnknownTimeZone, - CodeReadOnly: mysql.ErrVariableIsReadonly, - CodeWrongValueForVar: mysql.ErrWrongValueForVar, - CodeWrongTypeForVar: mysql.ErrWrongTypeForVar, - CodeTruncatedWrongValue: mysql.ErrTruncatedWrongValue, + CodeUnknownSystemVar: mysql.ErrUnknownSystemVariable, + CodeIncorrectScope: mysql.ErrIncorrectGlobalLocalVar, + CodeUnknownTimeZone: mysql.ErrUnknownTimeZone, + CodeReadOnly: mysql.ErrVariableIsReadonly, + CodeWrongValueForVar: mysql.ErrWrongValueForVar, + CodeWrongTypeForVar: mysql.ErrWrongTypeForVar, + CodeTruncatedWrongValue: mysql.ErrTruncatedWrongValue, + CodeMaxPreparedStmtCountReached: mysql.ErrMaxPreparedStmtCountReached, } terror.ErrClassToMySQLCodes[terror.ClassVariable] = mySQLErrCodes } @@ -516,7 +519,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, "innodb_create_intrinsic", ""}, {ScopeGlobal, "gtid_executed_compression_period", ""}, {ScopeGlobal, "ndb_log_empty_epochs", ""}, - {ScopeGlobal, "max_prepared_stmt_count", "16382"}, + {ScopeGlobal, MaxPreparedStmtCount, strconv.FormatInt(DefMaxPreparedStmtCount, 10)}, {ScopeNone, "have_geometry", "YES"}, {ScopeGlobal | ScopeSession, "optimizer_trace_max_mem_size", "16384"}, {ScopeGlobal | ScopeSession, "net_retry_count", "10"}, @@ -706,6 +709,8 @@ const ( GeneralLog = "general_log" // AvoidTemporalUpgrade is the name for 'avoid_temporal_upgrade' system variable. AvoidTemporalUpgrade = "avoid_temporal_upgrade" + // MaxPreparedStmtCount is the name for 'max_prepared_stmt_count' system variable. + MaxPreparedStmtCount = "max_prepared_stmt_count" // BigTables is the name for 'big_tables' system variable. BigTables = "big_tables" // CheckProxyUsers is the name for 'check_proxy_users' system variable. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 519631a2ab..7a01356e32 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -240,6 +240,7 @@ const ( DefCurretTS = 0 DefMaxChunkSize = 32 DefDMLBatchSize = 20000 + DefMaxPreparedStmtCount = -1 DefWaitTimeout = 28800 DefTiDBMemQuotaHashJoin = 32 << 30 // 32GB. DefTiDBMemQuotaMergeJoin = 32 << 30 // 32GB. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index a534490da4..ed601b9f86 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -292,6 +292,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return checkUInt64SystemVar(name, value, 1024, math.MaxUint64, vars) case WaitTimeout: return checkUInt64SystemVar(name, value, 1, 31536000, vars) + case MaxPreparedStmtCount: + return checkInt64SystemVar(name, value, -1, 1048576, vars) case TimeZone: if strings.EqualFold(value, "SYSTEM") { return "SYSTEM", nil