*: add preparedStmt metric and add limit to max prepareStmt (#8405)
This commit is contained in:
@ -177,8 +177,7 @@ func (e *PrepareExec) Next(ctx context.Context, chk *chunk.Chunk) error {
|
||||
if e.name != "" {
|
||||
vars.PreparedStmtNameToID[e.name] = e.ID
|
||||
}
|
||||
vars.PreparedStmts[e.ID] = prepared
|
||||
return nil
|
||||
return vars.AddPreparedStmt(e.ID, prepared)
|
||||
}
|
||||
|
||||
// ExecuteExec represents an EXECUTE executor.
|
||||
@ -242,7 +241,7 @@ func (e *DeallocateExec) Next(ctx context.Context, chk *chunk.Chunk) error {
|
||||
vars, id, vars.PreparedStmts[id].SchemaVersion,
|
||||
))
|
||||
}
|
||||
delete(vars.PreparedStmts, id)
|
||||
vars.RemovePreparedStmt(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -56,6 +56,7 @@ func RegisterMetrics() {
|
||||
prometheus.MustRegister(BatchAddIdxHistogram)
|
||||
prometheus.MustRegister(CampaignOwnerCounter)
|
||||
prometheus.MustRegister(ConnGauge)
|
||||
prometheus.MustRegister(PreparedStmtGauge)
|
||||
prometheus.MustRegister(CriticalErrorCounter)
|
||||
prometheus.MustRegister(DDLCounter)
|
||||
prometheus.MustRegister(DDLWorkerHistogram)
|
||||
|
||||
@ -48,6 +48,13 @@ var (
|
||||
Help: "Number of connections.",
|
||||
})
|
||||
|
||||
PreparedStmtGauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "tidb",
|
||||
Subsystem: "server",
|
||||
Name: "prepared_stmts",
|
||||
Help: "number of prepared statements.",
|
||||
})
|
||||
|
||||
ExecuteErrorCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "tidb",
|
||||
|
||||
@ -14,16 +14,20 @@
|
||||
package core_test
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/parser/terror"
|
||||
"github.com/pingcap/tidb/executor"
|
||||
"github.com/pingcap/tidb/infoschema"
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
"github.com/pingcap/tidb/planner/core"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/util/memory"
|
||||
"github.com/pingcap/tidb/util/testkit"
|
||||
"github.com/pingcap/tidb/util/testleak"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
@ -217,3 +221,60 @@ func (s *testPrepareSuite) TestPrepareCacheNow(c *C) {
|
||||
rs = tk.MustQuery("execute stmt4").Rows()
|
||||
c.Assert(rs[0][0].(string), Equals, rs[0][2].(string))
|
||||
}
|
||||
|
||||
func (s *testPrepareSuite) TestPrepareOverMaxPreparedStmtCount(c *C) {
|
||||
defer testleak.AfterTest(c)()
|
||||
store, dom, err := newStoreWithBootstrap()
|
||||
c.Assert(err, IsNil)
|
||||
tk := testkit.NewTestKit(c, store)
|
||||
defer func() {
|
||||
dom.Close()
|
||||
store.Close()
|
||||
}()
|
||||
tk.MustExec("use test")
|
||||
|
||||
// test prepare and deallocate.
|
||||
prePrepared := readGaugeInt(metrics.PreparedStmtGauge)
|
||||
tk.MustExec(`prepare stmt1 from "select 1"`)
|
||||
onePrepared := readGaugeInt(metrics.PreparedStmtGauge)
|
||||
c.Assert(prePrepared+1, Equals, onePrepared)
|
||||
tk.MustExec(`deallocate prepare stmt1`)
|
||||
deallocPrepared := readGaugeInt(metrics.PreparedStmtGauge)
|
||||
c.Assert(prePrepared, Equals, deallocPrepared)
|
||||
|
||||
// test change global limit and make it affected in test session.
|
||||
tk.MustQuery("select @@max_prepared_stmt_count").Check(testkit.Rows("-1"))
|
||||
tk.MustExec("set @@global.max_prepared_stmt_count = 2")
|
||||
tk.MustQuery("select @@global.max_prepared_stmt_count").Check(testkit.Rows("2"))
|
||||
time.Sleep(3 * time.Second) // renew a session after 2 sec
|
||||
|
||||
// test close session to give up all prepared stmt
|
||||
tk.MustExec(`prepare stmt2 from "select 1"`)
|
||||
prePrepared = readGaugeInt(metrics.PreparedStmtGauge)
|
||||
tk.Se.Close()
|
||||
drawPrepared := readGaugeInt(metrics.PreparedStmtGauge)
|
||||
c.Assert(prePrepared-1, Equals, drawPrepared)
|
||||
|
||||
// test meet max limit.
|
||||
tk.Se = nil
|
||||
tk.MustQuery("select @@max_prepared_stmt_count").Check(testkit.Rows("2"))
|
||||
for i := 1; ; i++ {
|
||||
prePrepared = readGaugeInt(metrics.PreparedStmtGauge)
|
||||
if prePrepared >= 2 {
|
||||
_, err = tk.Exec(`prepare stmt` + strconv.Itoa(i) + ` from "select 1"`)
|
||||
c.Assert(terror.ErrorEqual(err, variable.ErrMaxPreparedStmtCountReached), IsTrue)
|
||||
break
|
||||
} else {
|
||||
tk.Exec(`prepare stmt` + strconv.Itoa(i) + ` from "select 1"`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func readGaugeInt(g prometheus.Gauge) int {
|
||||
ch := make(chan prometheus.Metric, 1)
|
||||
g.Collect(ch)
|
||||
m := <-ch
|
||||
mm := &dto.Metric{}
|
||||
m.Write(mm)
|
||||
return int(mm.GetGauge().GetValue())
|
||||
}
|
||||
|
||||
@ -189,7 +189,7 @@ func (s *session) cleanRetryInfo() {
|
||||
}
|
||||
s.PreparedPlanCache().Delete(cacheKey)
|
||||
}
|
||||
delete(s.sessionVars.PreparedStmts, stmtID)
|
||||
s.sessionVars.RemovePreparedStmt(stmtID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1097,6 +1097,9 @@ func (s *session) Close() {
|
||||
if err := s.RollbackTxn(ctx); err != nil {
|
||||
log.Error("session Close error:", errors.ErrorStack(err))
|
||||
}
|
||||
if s.sessionVars != nil {
|
||||
s.sessionVars.WithdrawAllPreparedStmt()
|
||||
}
|
||||
}
|
||||
|
||||
// GetSessionVars implements the context.Context interface.
|
||||
@ -1381,6 +1384,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
|
||||
variable.TimeZone + quoteCommaQuote +
|
||||
variable.BlockEncryptionMode + quoteCommaQuote +
|
||||
variable.WaitTimeout + quoteCommaQuote +
|
||||
variable.MaxPreparedStmtCount + quoteCommaQuote +
|
||||
/* TiDB specific global variables: */
|
||||
variable.TiDBSkipUTF8Check + quoteCommaQuote +
|
||||
variable.TiDBIndexJoinBatchSize + quoteCommaQuote +
|
||||
|
||||
@ -16,6 +16,7 @@ package variable
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -31,6 +32,7 @@ import (
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/meta/autoid"
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
@ -44,6 +46,8 @@ const (
|
||||
codeSnapshotTooOld terror.ErrCode = 3
|
||||
)
|
||||
|
||||
var preparedStmtCount int64
|
||||
|
||||
// Error instances.
|
||||
var (
|
||||
errCantGetValidID = terror.ClassVariable.New(codeCantGetValidID, "cannot get valid auto-increment id in retry")
|
||||
@ -531,6 +535,46 @@ func (s *SessionVars) setDDLReorgPriority(val string) {
|
||||
}
|
||||
}
|
||||
|
||||
// AddPreparedStmt adds prepareStmt to current session and count in global.
|
||||
func (s *SessionVars) AddPreparedStmt(stmtID uint32, stmt *ast.Prepared) error {
|
||||
if _, exists := s.PreparedStmts[stmtID]; !exists {
|
||||
valStr, _ := s.GetSystemVar(MaxPreparedStmtCount)
|
||||
maxPreparedStmtCount, err := strconv.ParseInt(valStr, 10, 64)
|
||||
if err != nil {
|
||||
maxPreparedStmtCount = DefMaxPreparedStmtCount
|
||||
}
|
||||
newPreparedStmtCount := atomic.AddInt64(&preparedStmtCount, 1)
|
||||
if maxPreparedStmtCount >= 0 && newPreparedStmtCount > maxPreparedStmtCount {
|
||||
atomic.AddInt64(&preparedStmtCount, -1)
|
||||
return ErrMaxPreparedStmtCountReached.GenWithStackByArgs(maxPreparedStmtCount)
|
||||
}
|
||||
metrics.PreparedStmtGauge.Set(float64(newPreparedStmtCount))
|
||||
}
|
||||
s.PreparedStmts[stmtID] = stmt
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemovePreparedStmt removes preparedStmt from current session and decrease count in global.
|
||||
func (s *SessionVars) RemovePreparedStmt(stmtID uint32) {
|
||||
_, exists := s.PreparedStmts[stmtID]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
delete(s.PreparedStmts, stmtID)
|
||||
afterMinus := atomic.AddInt64(&preparedStmtCount, -1)
|
||||
metrics.PreparedStmtGauge.Set(float64(afterMinus))
|
||||
}
|
||||
|
||||
// WithdrawAllPreparedStmt remove all preparedStmt in current session and decrease count in global.
|
||||
func (s *SessionVars) WithdrawAllPreparedStmt() {
|
||||
psCount := len(s.PreparedStmts)
|
||||
if psCount == 0 {
|
||||
return
|
||||
}
|
||||
afterMinus := atomic.AddInt64(&preparedStmtCount, -int64(psCount))
|
||||
metrics.PreparedStmtGauge.Set(float64(afterMinus))
|
||||
}
|
||||
|
||||
// SetSystemVar sets the value of a system variable.
|
||||
func (s *SessionVars) SetSystemVar(name string, val string) error {
|
||||
switch name {
|
||||
|
||||
@ -59,27 +59,29 @@ func GetSysVar(name string) *SysVar {
|
||||
|
||||
// Variable error codes.
|
||||
const (
|
||||
CodeUnknownStatusVar terror.ErrCode = 1
|
||||
CodeUnknownSystemVar terror.ErrCode = mysql.ErrUnknownSystemVariable
|
||||
CodeIncorrectScope terror.ErrCode = mysql.ErrIncorrectGlobalLocalVar
|
||||
CodeUnknownTimeZone terror.ErrCode = mysql.ErrUnknownTimeZone
|
||||
CodeReadOnly terror.ErrCode = mysql.ErrVariableIsReadonly
|
||||
CodeWrongValueForVar terror.ErrCode = mysql.ErrWrongValueForVar
|
||||
CodeWrongTypeForVar terror.ErrCode = mysql.ErrWrongTypeForVar
|
||||
CodeTruncatedWrongValue terror.ErrCode = mysql.ErrTruncatedWrongValue
|
||||
CodeUnknownStatusVar terror.ErrCode = 1
|
||||
CodeUnknownSystemVar terror.ErrCode = mysql.ErrUnknownSystemVariable
|
||||
CodeIncorrectScope terror.ErrCode = mysql.ErrIncorrectGlobalLocalVar
|
||||
CodeUnknownTimeZone terror.ErrCode = mysql.ErrUnknownTimeZone
|
||||
CodeReadOnly terror.ErrCode = mysql.ErrVariableIsReadonly
|
||||
CodeWrongValueForVar terror.ErrCode = mysql.ErrWrongValueForVar
|
||||
CodeWrongTypeForVar terror.ErrCode = mysql.ErrWrongTypeForVar
|
||||
CodeTruncatedWrongValue terror.ErrCode = mysql.ErrTruncatedWrongValue
|
||||
CodeMaxPreparedStmtCountReached terror.ErrCode = mysql.ErrMaxPreparedStmtCountReached
|
||||
)
|
||||
|
||||
// Variable errors
|
||||
var (
|
||||
UnknownStatusVar = terror.ClassVariable.New(CodeUnknownStatusVar, "unknown status variable")
|
||||
UnknownSystemVar = terror.ClassVariable.New(CodeUnknownSystemVar, mysql.MySQLErrName[mysql.ErrUnknownSystemVariable])
|
||||
ErrIncorrectScope = terror.ClassVariable.New(CodeIncorrectScope, mysql.MySQLErrName[mysql.ErrIncorrectGlobalLocalVar])
|
||||
ErrUnknownTimeZone = terror.ClassVariable.New(CodeUnknownTimeZone, mysql.MySQLErrName[mysql.ErrUnknownTimeZone])
|
||||
ErrReadOnly = terror.ClassVariable.New(CodeReadOnly, "variable is read only")
|
||||
ErrWrongValueForVar = terror.ClassVariable.New(CodeWrongValueForVar, mysql.MySQLErrName[mysql.ErrWrongValueForVar])
|
||||
ErrWrongTypeForVar = terror.ClassVariable.New(CodeWrongTypeForVar, mysql.MySQLErrName[mysql.ErrWrongTypeForVar])
|
||||
ErrTruncatedWrongValue = terror.ClassVariable.New(CodeTruncatedWrongValue, mysql.MySQLErrName[mysql.ErrTruncatedWrongValue])
|
||||
ErrUnsupportedValueForVar = terror.ClassVariable.New(CodeUnknownStatusVar, "variable '%s' does not yet support value: %s")
|
||||
UnknownStatusVar = terror.ClassVariable.New(CodeUnknownStatusVar, "unknown status variable")
|
||||
UnknownSystemVar = terror.ClassVariable.New(CodeUnknownSystemVar, mysql.MySQLErrName[mysql.ErrUnknownSystemVariable])
|
||||
ErrIncorrectScope = terror.ClassVariable.New(CodeIncorrectScope, mysql.MySQLErrName[mysql.ErrIncorrectGlobalLocalVar])
|
||||
ErrUnknownTimeZone = terror.ClassVariable.New(CodeUnknownTimeZone, mysql.MySQLErrName[mysql.ErrUnknownTimeZone])
|
||||
ErrReadOnly = terror.ClassVariable.New(CodeReadOnly, "variable is read only")
|
||||
ErrWrongValueForVar = terror.ClassVariable.New(CodeWrongValueForVar, mysql.MySQLErrName[mysql.ErrWrongValueForVar])
|
||||
ErrWrongTypeForVar = terror.ClassVariable.New(CodeWrongTypeForVar, mysql.MySQLErrName[mysql.ErrWrongTypeForVar])
|
||||
ErrTruncatedWrongValue = terror.ClassVariable.New(CodeTruncatedWrongValue, mysql.MySQLErrName[mysql.ErrTruncatedWrongValue])
|
||||
ErrMaxPreparedStmtCountReached = terror.ClassVariable.New(CodeMaxPreparedStmtCountReached, mysql.MySQLErrName[mysql.ErrMaxPreparedStmtCountReached])
|
||||
ErrUnsupportedValueForVar = terror.ClassVariable.New(CodeUnknownStatusVar, "variable '%s' does not yet support value: %s")
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -91,13 +93,14 @@ func init() {
|
||||
|
||||
// Register terror to mysql error map.
|
||||
mySQLErrCodes := map[terror.ErrCode]uint16{
|
||||
CodeUnknownSystemVar: mysql.ErrUnknownSystemVariable,
|
||||
CodeIncorrectScope: mysql.ErrIncorrectGlobalLocalVar,
|
||||
CodeUnknownTimeZone: mysql.ErrUnknownTimeZone,
|
||||
CodeReadOnly: mysql.ErrVariableIsReadonly,
|
||||
CodeWrongValueForVar: mysql.ErrWrongValueForVar,
|
||||
CodeWrongTypeForVar: mysql.ErrWrongTypeForVar,
|
||||
CodeTruncatedWrongValue: mysql.ErrTruncatedWrongValue,
|
||||
CodeUnknownSystemVar: mysql.ErrUnknownSystemVariable,
|
||||
CodeIncorrectScope: mysql.ErrIncorrectGlobalLocalVar,
|
||||
CodeUnknownTimeZone: mysql.ErrUnknownTimeZone,
|
||||
CodeReadOnly: mysql.ErrVariableIsReadonly,
|
||||
CodeWrongValueForVar: mysql.ErrWrongValueForVar,
|
||||
CodeWrongTypeForVar: mysql.ErrWrongTypeForVar,
|
||||
CodeTruncatedWrongValue: mysql.ErrTruncatedWrongValue,
|
||||
CodeMaxPreparedStmtCountReached: mysql.ErrMaxPreparedStmtCountReached,
|
||||
}
|
||||
terror.ErrClassToMySQLCodes[terror.ClassVariable] = mySQLErrCodes
|
||||
}
|
||||
@ -516,7 +519,7 @@ var defaultSysVars = []*SysVar{
|
||||
{ScopeSession, "innodb_create_intrinsic", ""},
|
||||
{ScopeGlobal, "gtid_executed_compression_period", ""},
|
||||
{ScopeGlobal, "ndb_log_empty_epochs", ""},
|
||||
{ScopeGlobal, "max_prepared_stmt_count", "16382"},
|
||||
{ScopeGlobal, MaxPreparedStmtCount, strconv.FormatInt(DefMaxPreparedStmtCount, 10)},
|
||||
{ScopeNone, "have_geometry", "YES"},
|
||||
{ScopeGlobal | ScopeSession, "optimizer_trace_max_mem_size", "16384"},
|
||||
{ScopeGlobal | ScopeSession, "net_retry_count", "10"},
|
||||
@ -706,6 +709,8 @@ const (
|
||||
GeneralLog = "general_log"
|
||||
// AvoidTemporalUpgrade is the name for 'avoid_temporal_upgrade' system variable.
|
||||
AvoidTemporalUpgrade = "avoid_temporal_upgrade"
|
||||
// MaxPreparedStmtCount is the name for 'max_prepared_stmt_count' system variable.
|
||||
MaxPreparedStmtCount = "max_prepared_stmt_count"
|
||||
// BigTables is the name for 'big_tables' system variable.
|
||||
BigTables = "big_tables"
|
||||
// CheckProxyUsers is the name for 'check_proxy_users' system variable.
|
||||
|
||||
@ -240,6 +240,7 @@ const (
|
||||
DefCurretTS = 0
|
||||
DefMaxChunkSize = 32
|
||||
DefDMLBatchSize = 20000
|
||||
DefMaxPreparedStmtCount = -1
|
||||
DefWaitTimeout = 28800
|
||||
DefTiDBMemQuotaHashJoin = 32 << 30 // 32GB.
|
||||
DefTiDBMemQuotaMergeJoin = 32 << 30 // 32GB.
|
||||
|
||||
@ -292,6 +292,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
|
||||
return checkUInt64SystemVar(name, value, 1024, math.MaxUint64, vars)
|
||||
case WaitTimeout:
|
||||
return checkUInt64SystemVar(name, value, 1, 31536000, vars)
|
||||
case MaxPreparedStmtCount:
|
||||
return checkInt64SystemVar(name, value, -1, 1048576, vars)
|
||||
case TimeZone:
|
||||
if strings.EqualFold(value, "SYSTEM") {
|
||||
return "SYSTEM", nil
|
||||
|
||||
Reference in New Issue
Block a user