infoschema, util/stmtsummary: enhance statements_summary. (#25031)
This commit is contained in:
@ -1131,8 +1131,8 @@ var tableStatementsSummaryCols = []columnInfo{
|
||||
{name: "SUMMARY_BEGIN_TIME", tp: mysql.TypeTimestamp, size: 26, flag: mysql.NotNullFlag, comment: "Begin time of this summary"},
|
||||
{name: "SUMMARY_END_TIME", tp: mysql.TypeTimestamp, size: 26, flag: mysql.NotNullFlag, comment: "End time of this summary"},
|
||||
{name: "STMT_TYPE", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "Statement type"},
|
||||
{name: "SCHEMA_NAME", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "Current schema"},
|
||||
{name: "DIGEST", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag},
|
||||
{name: "SCHEMA_NAME", tp: mysql.TypeVarchar, size: 64, comment: "Current schema"},
|
||||
{name: "DIGEST", tp: mysql.TypeVarchar, size: 64},
|
||||
{name: "DIGEST_TEXT", tp: mysql.TypeBlob, size: types.UnspecifiedLength, flag: mysql.NotNullFlag, comment: "Normalized statement"},
|
||||
{name: "TABLE_NAMES", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "Involved tables"},
|
||||
{name: "INDEX_NAMES", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "Used indices"},
|
||||
|
||||
@ -1401,6 +1401,71 @@ func (s *testTableSuite) TestSimpleStmtSummaryEvictedCount(c *C) {
|
||||
tk.MustExec("set global tidb_stmt_summary_refresh_interval = 1800")
|
||||
}
|
||||
|
||||
func (s *testTableSuite) TestStmtSummaryTableOther(c *C) {
|
||||
interval := int64(1800)
|
||||
tk := s.newTestKitWithRoot(c)
|
||||
tk.MustExec(fmt.Sprintf("set global tidb_stmt_summary_refresh_interval=%v", interval))
|
||||
tk.MustExec("set global tidb_enable_stmt_summary=0")
|
||||
tk.MustExec("set global tidb_enable_stmt_summary=1")
|
||||
// set stmt size to 1
|
||||
// first sql
|
||||
tk.MustExec("set global tidb_stmt_summary_max_stmt_count=1")
|
||||
defer tk.MustExec("set global tidb_stmt_summary_max_stmt_count=100")
|
||||
// second sql, evict first sql from stmt_summary
|
||||
tk.MustExec("show databases;")
|
||||
// third sql, evict second sql from stmt_summary
|
||||
tk.MustQuery("SELECT DIGEST_TEXT, DIGEST FROM `INFORMATION_SCHEMA`.`STATEMENTS_SUMMARY`;").
|
||||
Check(testkit.Rows(
|
||||
// digest in cache
|
||||
// "show databases ;"
|
||||
"show databases ; dcd020298c5f79e8dc9d63b3098083601614a04a52db458738347d15ea5712a1",
|
||||
// digest evicted
|
||||
" <nil>",
|
||||
))
|
||||
// forth sql, evict third sql from stmt_summary
|
||||
tk.MustQuery("SELECT SCHEMA_NAME FROM `INFORMATION_SCHEMA`.`STATEMENTS_SUMMARY`;").
|
||||
Check(testkit.Rows(
|
||||
// digest in cache
|
||||
"test", // select xx from yy;
|
||||
// digest evicted
|
||||
"<nil>",
|
||||
))
|
||||
}
|
||||
|
||||
func (s *testTableSuite) TestStmtSummaryHistoryTableOther(c *C) {
|
||||
tk := s.newTestKitWithRoot(c)
|
||||
// disable refreshing summary
|
||||
interval := int64(9999)
|
||||
tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 1")
|
||||
defer tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 100")
|
||||
tk.MustExec(fmt.Sprintf("set global tidb_stmt_summary_refresh_interval = %v", interval))
|
||||
defer tk.MustExec(fmt.Sprintf("set global tidb_stmt_summary_refresh_interval = %v", 1800))
|
||||
|
||||
tk.MustExec("set global tidb_enable_stmt_summary = 0")
|
||||
tk.MustExec("set global tidb_enable_stmt_summary = 1")
|
||||
// first sql
|
||||
tk.MustExec("set global tidb_stmt_summary_max_stmt_count=1")
|
||||
// second sql, evict first sql from stmt_summary
|
||||
tk.MustExec("show databases;")
|
||||
// third sql, evict second sql from stmt_summary
|
||||
tk.MustQuery("SELECT DIGEST_TEXT, DIGEST FROM `INFORMATION_SCHEMA`.`STATEMENTS_SUMMARY_HISTORY`;").
|
||||
Check(testkit.Rows(
|
||||
// digest in cache
|
||||
// "show databases ;"
|
||||
"show databases ; dcd020298c5f79e8dc9d63b3098083601614a04a52db458738347d15ea5712a1",
|
||||
// digest evicted
|
||||
" <nil>",
|
||||
))
|
||||
// forth sql, evict third sql from stmt_summary
|
||||
tk.MustQuery("SELECT SCHEMA_NAME FROM `INFORMATION_SCHEMA`.`STATEMENTS_SUMMARY_HISTORY`;").
|
||||
Check(testkit.Rows(
|
||||
// digest in cache
|
||||
"test", // select xx from yy;
|
||||
// digest evicted
|
||||
"<nil>",
|
||||
))
|
||||
}
|
||||
|
||||
func (s *testTableSuite) TestPerformanceSchemaforPlanCache(c *C) {
|
||||
orgEnable := plannercore.PreparedPlanCacheEnabled()
|
||||
defer func() {
|
||||
|
||||
@ -15,6 +15,8 @@ package stmtsummary
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/parser/mysql"
|
||||
@ -23,6 +25,7 @@ import (
|
||||
|
||||
// stmtSummaryByDigestEvicted contents digests evicted from stmtSummaryByDigestMap
|
||||
type stmtSummaryByDigestEvicted struct {
|
||||
sync.Mutex
|
||||
// record evicted data in intervals
|
||||
// latest history data is Back()
|
||||
history *list.List
|
||||
@ -34,8 +37,10 @@ type stmtSummaryByDigestEvictedElement struct {
|
||||
beginTime int64
|
||||
// endTime is the end time of current interval
|
||||
endTime int64
|
||||
// *Kinds* of digest being evicted
|
||||
// digestKeyMap contains *Kinds* of digest being evicted
|
||||
digestKeyMap map[string]struct{}
|
||||
// otherSummary contains summed up information of evicted elements
|
||||
otherSummary *stmtSummaryByDigestElement
|
||||
}
|
||||
|
||||
// spawn a new pointer to stmtSummaryByDigestEvicted
|
||||
@ -51,6 +56,14 @@ func newStmtSummaryByDigestEvictedElement(beginTime int64, endTime int64) *stmtS
|
||||
beginTime: beginTime,
|
||||
endTime: endTime,
|
||||
digestKeyMap: make(map[string]struct{}),
|
||||
otherSummary: &stmtSummaryByDigestElement{
|
||||
beginTime: beginTime,
|
||||
endTime: endTime,
|
||||
authUsers: make(map[string]struct{}),
|
||||
minLatency: time.Duration(math.MaxInt64),
|
||||
backoffTypes: make(map[string]int),
|
||||
firstSeen: time.Unix(endTime, 0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,6 +76,9 @@ func (ssbde *stmtSummaryByDigestEvicted) AddEvicted(evictedKey *stmtSummaryByDig
|
||||
evictedValue.Lock()
|
||||
defer evictedValue.Unlock()
|
||||
|
||||
ssbde.Lock()
|
||||
defer ssbde.Unlock()
|
||||
|
||||
if evictedValue.history == nil {
|
||||
return
|
||||
}
|
||||
@ -130,6 +146,8 @@ func (ssbde *stmtSummaryByDigestEvicted) AddEvicted(evictedKey *stmtSummaryByDig
|
||||
|
||||
// Clear up all records in stmtSummaryByDigestEvicted
|
||||
func (ssbde *stmtSummaryByDigestEvicted) Clear() {
|
||||
ssbde.Lock()
|
||||
defer ssbde.Unlock()
|
||||
ssbde.history.Init()
|
||||
}
|
||||
|
||||
@ -137,6 +155,7 @@ func (ssbde *stmtSummaryByDigestEvicted) Clear() {
|
||||
func (seElement *stmtSummaryByDigestEvictedElement) addEvicted(digestKey *stmtSummaryByDigestKey, digestValue *stmtSummaryByDigestElement) {
|
||||
if digestKey != nil {
|
||||
seElement.digestKeyMap[string(digestKey.Hash())] = struct{}{}
|
||||
addInfo(seElement.otherSummary, digestValue)
|
||||
}
|
||||
}
|
||||
|
||||
@ -190,3 +209,211 @@ func (seElement *stmtSummaryByDigestEvictedElement) toEvictedCountDatum() []type
|
||||
func (ssMap *stmtSummaryByDigestMap) ToEvictedCountDatum() [][]types.Datum {
|
||||
return ssMap.other.ToEvictedCountDatum()
|
||||
}
|
||||
|
||||
func (ssbde *stmtSummaryByDigestEvicted) toCurrentDatum() []types.Datum {
|
||||
var seElement *stmtSummaryByDigestEvictedElement
|
||||
|
||||
ssbde.Lock()
|
||||
if ssbde.history.Len() > 0 {
|
||||
seElement = ssbde.history.Back().Value.(*stmtSummaryByDigestEvictedElement)
|
||||
}
|
||||
ssbde.Unlock()
|
||||
|
||||
if seElement == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return seElement.toDatum()
|
||||
}
|
||||
|
||||
func (ssbde *stmtSummaryByDigestEvicted) toHistoryDatum(historySize int) [][]types.Datum {
|
||||
// Collect all history summaries to an array.
|
||||
ssbde.Lock()
|
||||
seElements := ssbde.collectHistorySummaries(historySize)
|
||||
ssbde.Unlock()
|
||||
rows := make([][]types.Datum, 0, len(seElements))
|
||||
|
||||
for _, seElement := range seElements {
|
||||
rows = append(rows, seElement.toDatum())
|
||||
}
|
||||
return rows
|
||||
}
|
||||
|
||||
func (ssbde *stmtSummaryByDigestEvicted) collectHistorySummaries(historySize int) []*stmtSummaryByDigestEvictedElement {
|
||||
lst := make([]*stmtSummaryByDigestEvictedElement, 0, ssbde.history.Len())
|
||||
for element := ssbde.history.Front(); element != nil && len(lst) < historySize; element = element.Next() {
|
||||
seElement := element.Value.(*stmtSummaryByDigestEvictedElement)
|
||||
lst = append(lst, seElement)
|
||||
}
|
||||
return lst
|
||||
}
|
||||
|
||||
func (seElement *stmtSummaryByDigestEvictedElement) toDatum() []types.Datum {
|
||||
return seElement.otherSummary.toDatum(new(stmtSummaryByDigest))
|
||||
}
|
||||
|
||||
// addInfo adds information in addWith into addTo.
|
||||
func addInfo(addTo *stmtSummaryByDigestElement, addWith *stmtSummaryByDigestElement) {
|
||||
addTo.Lock()
|
||||
defer addTo.Unlock()
|
||||
|
||||
// user
|
||||
for user := range addWith.authUsers {
|
||||
addTo.authUsers[user] = struct{}{}
|
||||
}
|
||||
|
||||
// execCount and sumWarnings
|
||||
addTo.execCount += addWith.execCount
|
||||
addTo.sumWarnings += addWith.sumWarnings
|
||||
|
||||
// latency
|
||||
addTo.sumLatency += addWith.sumLatency
|
||||
if addTo.maxLatency < addWith.maxLatency {
|
||||
addTo.maxLatency = addWith.maxLatency
|
||||
}
|
||||
if addTo.minLatency > addWith.minLatency {
|
||||
addTo.minLatency = addWith.minLatency
|
||||
}
|
||||
addTo.sumParseLatency += addWith.sumParseLatency
|
||||
if addTo.maxParseLatency < addWith.maxParseLatency {
|
||||
addTo.maxParseLatency = addWith.maxParseLatency
|
||||
}
|
||||
addTo.sumCompileLatency += addWith.sumCompileLatency
|
||||
if addTo.maxCompileLatency < addWith.maxCompileLatency {
|
||||
addTo.maxCompileLatency = addWith.maxCompileLatency
|
||||
}
|
||||
|
||||
// coprocessor
|
||||
addTo.sumNumCopTasks += addWith.sumNumCopTasks
|
||||
if addTo.maxCopProcessTime < addWith.maxCopProcessTime {
|
||||
addTo.maxCopProcessTime = addWith.maxCopProcessTime
|
||||
addTo.maxCopProcessAddress = addWith.maxCopProcessAddress
|
||||
}
|
||||
if addTo.maxCopWaitTime < addWith.maxCopWaitTime {
|
||||
addTo.maxCopWaitTime = addWith.maxCopWaitTime
|
||||
addTo.maxCopWaitAddress = addWith.maxCopWaitAddress
|
||||
}
|
||||
|
||||
// TiKV
|
||||
addTo.sumProcessTime += addWith.sumProcessTime
|
||||
if addTo.maxProcessTime < addWith.maxProcessTime {
|
||||
addTo.maxProcessTime = addWith.maxProcessTime
|
||||
}
|
||||
addTo.sumWaitTime += addWith.sumWaitTime
|
||||
if addTo.maxWaitTime < addWith.maxWaitTime {
|
||||
addTo.maxWaitTime = addWith.maxWaitTime
|
||||
}
|
||||
addTo.sumBackoffTime += addWith.sumBackoffTime
|
||||
if addTo.maxBackoffTime < addWith.maxBackoffTime {
|
||||
addTo.maxBackoffTime = addWith.maxBackoffTime
|
||||
}
|
||||
|
||||
addTo.sumTotalKeys += addWith.sumTotalKeys
|
||||
if addTo.maxTotalKeys < addWith.maxTotalKeys {
|
||||
addTo.maxTotalKeys = addWith.maxTotalKeys
|
||||
}
|
||||
addTo.sumProcessedKeys += addWith.sumProcessedKeys
|
||||
if addTo.maxProcessedKeys < addWith.maxProcessedKeys {
|
||||
addTo.maxProcessedKeys = addWith.maxProcessedKeys
|
||||
}
|
||||
addTo.sumRocksdbDeleteSkippedCount += addWith.sumRocksdbDeleteSkippedCount
|
||||
if addTo.maxRocksdbDeleteSkippedCount < addWith.maxRocksdbDeleteSkippedCount {
|
||||
addTo.maxRocksdbDeleteSkippedCount = addWith.maxRocksdbDeleteSkippedCount
|
||||
}
|
||||
addTo.sumRocksdbKeySkippedCount += addWith.sumRocksdbKeySkippedCount
|
||||
if addTo.maxRocksdbKeySkippedCount < addWith.maxRocksdbKeySkippedCount {
|
||||
addTo.maxRocksdbKeySkippedCount = addWith.maxRocksdbKeySkippedCount
|
||||
}
|
||||
addTo.sumRocksdbBlockCacheHitCount += addWith.sumRocksdbBlockCacheHitCount
|
||||
if addTo.maxRocksdbBlockCacheHitCount < addWith.maxRocksdbBlockCacheHitCount {
|
||||
addTo.maxRocksdbBlockCacheHitCount = addWith.maxRocksdbBlockCacheHitCount
|
||||
}
|
||||
addTo.sumRocksdbBlockReadCount += addWith.sumRocksdbBlockReadCount
|
||||
if addTo.maxRocksdbBlockReadCount < addWith.maxRocksdbBlockReadCount {
|
||||
addTo.maxRocksdbBlockReadCount = addWith.maxRocksdbBlockReadCount
|
||||
}
|
||||
addTo.sumRocksdbBlockReadByte += addWith.sumRocksdbBlockReadByte
|
||||
if addTo.maxRocksdbBlockReadByte < addWith.maxRocksdbBlockReadByte {
|
||||
addTo.maxRocksdbBlockReadByte = addWith.maxRocksdbBlockReadByte
|
||||
}
|
||||
|
||||
// txn
|
||||
addTo.commitCount += addWith.commitCount
|
||||
addTo.sumPrewriteTime += addWith.sumPrewriteTime
|
||||
if addTo.maxPrewriteTime < addWith.maxPrewriteTime {
|
||||
addTo.maxPrewriteTime = addWith.maxPrewriteTime
|
||||
}
|
||||
addTo.sumCommitTime += addWith.sumCommitTime
|
||||
if addTo.maxCommitTime < addWith.maxCommitTime {
|
||||
addTo.maxCommitTime = addWith.maxCommitTime
|
||||
}
|
||||
addTo.sumGetCommitTsTime += addWith.sumGetCommitTsTime
|
||||
if addTo.maxGetCommitTsTime < addWith.maxGetCommitTsTime {
|
||||
addTo.maxGetCommitTsTime = addWith.maxGetCommitTsTime
|
||||
}
|
||||
addTo.sumCommitBackoffTime += addWith.sumCommitBackoffTime
|
||||
if addTo.maxCommitBackoffTime < addWith.maxCommitBackoffTime {
|
||||
addTo.maxCommitBackoffTime = addWith.maxCommitBackoffTime
|
||||
}
|
||||
addTo.sumResolveLockTime += addWith.sumResolveLockTime
|
||||
if addTo.maxResolveLockTime < addWith.maxResolveLockTime {
|
||||
addTo.maxResolveLockTime = addWith.maxResolveLockTime
|
||||
}
|
||||
addTo.sumLocalLatchTime += addWith.sumLocalLatchTime
|
||||
if addTo.maxLocalLatchTime < addWith.maxLocalLatchTime {
|
||||
addTo.maxLocalLatchTime = addWith.maxLocalLatchTime
|
||||
}
|
||||
addTo.sumWriteKeys += addWith.sumWriteKeys
|
||||
if addTo.maxWriteKeys < addWith.maxWriteKeys {
|
||||
addTo.maxWriteKeys = addWith.maxWriteKeys
|
||||
}
|
||||
addTo.sumWriteSize += addWith.sumWriteSize
|
||||
if addTo.maxWriteSize < addWith.maxWriteSize {
|
||||
addTo.maxWriteSize = addWith.maxWriteSize
|
||||
}
|
||||
addTo.sumPrewriteRegionNum += addWith.sumPrewriteRegionNum
|
||||
if addTo.maxPrewriteRegionNum < addWith.maxPrewriteRegionNum {
|
||||
addTo.maxPrewriteRegionNum = addWith.maxPrewriteRegionNum
|
||||
}
|
||||
addTo.sumTxnRetry += addWith.sumTxnRetry
|
||||
if addTo.maxTxnRetry < addWith.maxTxnRetry {
|
||||
addTo.maxTxnRetry = addWith.maxTxnRetry
|
||||
}
|
||||
addTo.sumBackoffTimes += addWith.sumBackoffTimes
|
||||
for backoffType, backoffValue := range addWith.backoffTypes {
|
||||
_, ok := addTo.backoffTypes[backoffType]
|
||||
if ok {
|
||||
addTo.backoffTypes[backoffType] += backoffValue
|
||||
} else {
|
||||
addTo.backoffTypes[backoffType] = backoffValue
|
||||
}
|
||||
}
|
||||
|
||||
// plan cache
|
||||
addTo.planCacheHits += addWith.planCacheHits
|
||||
|
||||
// other
|
||||
addTo.sumAffectedRows += addWith.sumAffectedRows
|
||||
addTo.sumMem += addWith.sumMem
|
||||
if addTo.maxMem < addWith.maxMem {
|
||||
addTo.maxMem = addWith.maxMem
|
||||
}
|
||||
addTo.sumDisk += addWith.sumDisk
|
||||
if addTo.maxDisk < addWith.maxDisk {
|
||||
addTo.maxDisk = addWith.maxDisk
|
||||
}
|
||||
if addTo.firstSeen.After(addWith.firstSeen) {
|
||||
addTo.firstSeen = addWith.firstSeen
|
||||
}
|
||||
if addTo.lastSeen.Before(addWith.lastSeen) {
|
||||
addTo.lastSeen = addWith.lastSeen
|
||||
}
|
||||
addTo.execRetryCount += addWith.execRetryCount
|
||||
addTo.execRetryTime += addWith.execRetryTime
|
||||
addTo.sumKVTotal += addWith.sumKVTotal
|
||||
addTo.sumPDTotal += addWith.sumPDTotal
|
||||
addTo.sumBackoffTotal += addWith.sumBackoffTotal
|
||||
addTo.sumWriteSQLRespTotal += addWith.sumWriteSQLRespTotal
|
||||
|
||||
addTo.sumErrors += addWith.sumErrors
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
@ -25,6 +26,7 @@ import (
|
||||
"github.com/pingcap/tidb/types"
|
||||
)
|
||||
|
||||
// fake a stmtSummaryByDigest
|
||||
func newInduceSsbd(beginTime int64, endTime int64) *stmtSummaryByDigest {
|
||||
newSsbd := &stmtSummaryByDigest{
|
||||
history: list.New(),
|
||||
@ -32,6 +34,8 @@ func newInduceSsbd(beginTime int64, endTime int64) *stmtSummaryByDigest {
|
||||
newSsbd.history.PushBack(newInduceSsbde(beginTime, endTime))
|
||||
return newSsbd
|
||||
}
|
||||
|
||||
// fake a stmtSummaryByDigestElement
|
||||
func newInduceSsbde(beginTime int64, endTime int64) *stmtSummaryByDigestElement {
|
||||
newSsbde := &stmtSummaryByDigestElement{
|
||||
beginTime: beginTime,
|
||||
@ -58,7 +62,7 @@ func (s *testStmtSummarySuite) TestMapToEvictedCountDatum(c *C) {
|
||||
interval := ssMap.refreshInterval()
|
||||
ssMap.beginTimeForCurInterval = now + interval
|
||||
|
||||
// set summaryMap capacity to 1.
|
||||
// set summaryMap's capacity to 1.
|
||||
err := ssMap.summaryMap.SetCapacity(1)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
@ -94,7 +98,7 @@ func (s *testStmtSummarySuite) TestMapToEvictedCountDatum(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
ssMap.beginTimeForCurInterval = now + interval
|
||||
// insert one statement every other interval.
|
||||
// insert one statement per interval.
|
||||
for i := 0; i < 50; i++ {
|
||||
ssMap.AddStatement(generateAnyExecInfo())
|
||||
ssMap.beginTimeForCurInterval += interval * 2
|
||||
@ -239,7 +243,7 @@ func (s *testStmtSummarySuite) TestStmtSummaryByDigestEvictedElement(c *C) {
|
||||
}
|
||||
|
||||
// test stmtSummaryByDigestEvicted.addEvicted
|
||||
// test evicted count's detail
|
||||
// test stmtSummaryByDigestEvicted.toEvictedCountDatum (single and multiple intervals)
|
||||
func (s *testStmtSummarySuite) TestEvictedCountDetailed(c *C) {
|
||||
ssMap := newStmtSummaryByDigestMap()
|
||||
ssMap.Clear()
|
||||
@ -304,6 +308,12 @@ func (s *testStmtSummarySuite) TestEvictedCountDetailed(c *C) {
|
||||
c.Assert(other.history.Len(), Equals, 0)
|
||||
}
|
||||
|
||||
func (s *testStmtSummarySuite) TestEvictedElementToDatum(c *C) {
|
||||
seElement := newStmtSummaryByDigestEvictedElement(0, 1)
|
||||
datum0 := seElement.toDatum()
|
||||
c.Assert(datum0, NotNil)
|
||||
}
|
||||
|
||||
func (s *testStmtSummarySuite) TestNewStmtSummaryByDigestEvictedElement(c *C) {
|
||||
now := time.Now().Unix()
|
||||
end := now + 60
|
||||
@ -318,6 +328,300 @@ func (s *testStmtSummarySuite) TestStmtSummaryByDigestEvicted(c *C) {
|
||||
c.Assert(stmtEvicted.history.Len(), Equals, 0)
|
||||
}
|
||||
|
||||
// test addInfo function
|
||||
func (s *testStmtSummarySuite) TestAddInfo(c *C) {
|
||||
now := time.Now().Unix()
|
||||
addTo := stmtSummaryByDigestElement{
|
||||
// user
|
||||
authUsers: map[string]struct{}{"a": {}},
|
||||
|
||||
// execCount and sumWarnings
|
||||
execCount: 3,
|
||||
sumWarnings: 8,
|
||||
|
||||
// latency
|
||||
sumLatency: 8,
|
||||
maxLatency: 5,
|
||||
minLatency: 1,
|
||||
sumParseLatency: 3,
|
||||
maxParseLatency: 2,
|
||||
sumCompileLatency: 3,
|
||||
maxCompileLatency: 2,
|
||||
|
||||
// coprocessor
|
||||
sumNumCopTasks: 4,
|
||||
maxCopProcessTime: 4,
|
||||
maxCopProcessAddress: "19.19.8.10",
|
||||
maxCopWaitTime: 4,
|
||||
maxCopWaitAddress: "19.19.8.10",
|
||||
|
||||
// TiKV
|
||||
sumProcessTime: 1,
|
||||
maxProcessTime: 1,
|
||||
sumWaitTime: 2,
|
||||
maxWaitTime: 1,
|
||||
sumBackoffTime: 2,
|
||||
maxBackoffTime: 2,
|
||||
|
||||
sumTotalKeys: 3,
|
||||
maxTotalKeys: 2,
|
||||
sumProcessedKeys: 8,
|
||||
maxProcessedKeys: 4,
|
||||
sumRocksdbDeleteSkippedCount: 8,
|
||||
maxRocksdbDeleteSkippedCount: 2,
|
||||
|
||||
sumRocksdbKeySkippedCount: 8,
|
||||
maxRocksdbKeySkippedCount: 3,
|
||||
sumRocksdbBlockCacheHitCount: 8,
|
||||
maxRocksdbBlockCacheHitCount: 3,
|
||||
sumRocksdbBlockReadCount: 3,
|
||||
maxRocksdbBlockReadCount: 3,
|
||||
sumRocksdbBlockReadByte: 4,
|
||||
maxRocksdbBlockReadByte: 4,
|
||||
|
||||
// txn
|
||||
commitCount: 8,
|
||||
sumPrewriteTime: 3,
|
||||
maxPrewriteTime: 3,
|
||||
sumCommitTime: 8,
|
||||
maxCommitTime: 5,
|
||||
sumGetCommitTsTime: 8,
|
||||
maxGetCommitTsTime: 8,
|
||||
sumCommitBackoffTime: 8,
|
||||
maxCommitBackoffTime: 8,
|
||||
|
||||
sumResolveLockTime: 8,
|
||||
maxResolveLockTime: 8,
|
||||
sumLocalLatchTime: 8,
|
||||
maxLocalLatchTime: 8,
|
||||
sumWriteKeys: 8,
|
||||
maxWriteKeys: 8,
|
||||
sumWriteSize: 8,
|
||||
maxWriteSize: 8,
|
||||
sumPrewriteRegionNum: 8,
|
||||
maxPrewriteRegionNum: 8,
|
||||
sumTxnRetry: 8,
|
||||
maxTxnRetry: 8,
|
||||
sumBackoffTimes: 8,
|
||||
backoffTypes: map[string]int{},
|
||||
|
||||
// plan cache
|
||||
planCacheHits: 8,
|
||||
|
||||
// other
|
||||
sumAffectedRows: 8,
|
||||
sumMem: 8,
|
||||
maxMem: 8,
|
||||
sumDisk: 8,
|
||||
maxDisk: 8,
|
||||
firstSeen: time.Unix(now-10, 0),
|
||||
lastSeen: time.Unix(now-8, 0),
|
||||
execRetryCount: 8,
|
||||
execRetryTime: 8,
|
||||
sumKVTotal: 2,
|
||||
sumPDTotal: 2,
|
||||
sumBackoffTotal: 2,
|
||||
sumWriteSQLRespTotal: 100,
|
||||
sumErrors: 8,
|
||||
}
|
||||
|
||||
addWith := stmtSummaryByDigestElement{
|
||||
// user
|
||||
authUsers: map[string]struct{}{"a": {}},
|
||||
|
||||
// execCount and sumWarnings
|
||||
execCount: 3,
|
||||
sumWarnings: 8,
|
||||
|
||||
// latency
|
||||
sumLatency: 8,
|
||||
maxLatency: 5,
|
||||
minLatency: 1,
|
||||
sumParseLatency: 3,
|
||||
maxParseLatency: 2,
|
||||
sumCompileLatency: 3,
|
||||
maxCompileLatency: 2,
|
||||
|
||||
// coprocessor
|
||||
sumNumCopTasks: 4,
|
||||
maxCopProcessTime: 4,
|
||||
maxCopProcessAddress: "19.19.8.10",
|
||||
maxCopWaitTime: 4,
|
||||
maxCopWaitAddress: "19.19.8.10",
|
||||
|
||||
// TiKV
|
||||
sumProcessTime: 1,
|
||||
maxProcessTime: 1,
|
||||
sumWaitTime: 2,
|
||||
maxWaitTime: 1,
|
||||
sumBackoffTime: 2,
|
||||
maxBackoffTime: 2,
|
||||
|
||||
sumTotalKeys: 3,
|
||||
maxTotalKeys: 2,
|
||||
sumProcessedKeys: 8,
|
||||
maxProcessedKeys: 4,
|
||||
sumRocksdbDeleteSkippedCount: 8,
|
||||
maxRocksdbDeleteSkippedCount: 2,
|
||||
|
||||
sumRocksdbKeySkippedCount: 8,
|
||||
maxRocksdbKeySkippedCount: 3,
|
||||
sumRocksdbBlockCacheHitCount: 8,
|
||||
maxRocksdbBlockCacheHitCount: 3,
|
||||
sumRocksdbBlockReadCount: 3,
|
||||
maxRocksdbBlockReadCount: 3,
|
||||
sumRocksdbBlockReadByte: 4,
|
||||
maxRocksdbBlockReadByte: 4,
|
||||
|
||||
// txn
|
||||
commitCount: 8,
|
||||
sumPrewriteTime: 3,
|
||||
maxPrewriteTime: 3,
|
||||
sumCommitTime: 8,
|
||||
maxCommitTime: 5,
|
||||
sumGetCommitTsTime: 8,
|
||||
maxGetCommitTsTime: 8,
|
||||
sumCommitBackoffTime: 8,
|
||||
maxCommitBackoffTime: 8,
|
||||
|
||||
sumResolveLockTime: 8,
|
||||
maxResolveLockTime: 8,
|
||||
sumLocalLatchTime: 8,
|
||||
maxLocalLatchTime: 8,
|
||||
sumWriteKeys: 8,
|
||||
maxWriteKeys: 8,
|
||||
sumWriteSize: 8,
|
||||
maxWriteSize: 8,
|
||||
sumPrewriteRegionNum: 8,
|
||||
maxPrewriteRegionNum: 8,
|
||||
sumTxnRetry: 8,
|
||||
maxTxnRetry: 8,
|
||||
sumBackoffTimes: 8,
|
||||
backoffTypes: map[string]int{},
|
||||
|
||||
// plan cache
|
||||
planCacheHits: 8,
|
||||
|
||||
// other
|
||||
sumAffectedRows: 8,
|
||||
sumMem: 8,
|
||||
maxMem: 8,
|
||||
sumDisk: 8,
|
||||
maxDisk: 8,
|
||||
firstSeen: time.Unix(now-10, 0),
|
||||
lastSeen: time.Unix(now-8, 0),
|
||||
execRetryCount: 8,
|
||||
execRetryTime: 8,
|
||||
sumKVTotal: 2,
|
||||
sumPDTotal: 2,
|
||||
sumBackoffTotal: 2,
|
||||
sumWriteSQLRespTotal: 100,
|
||||
sumErrors: 8,
|
||||
}
|
||||
addWith.authUsers["b"] = struct{}{}
|
||||
addWith.maxCopProcessTime = 15
|
||||
addWith.maxCopProcessAddress = "1.14.5.14"
|
||||
addWith.firstSeen = time.Unix(now-20, 0)
|
||||
addWith.lastSeen = time.Unix(now, 0)
|
||||
|
||||
addInfo(&addTo, &addWith)
|
||||
|
||||
expectedSum := stmtSummaryByDigestElement{
|
||||
// user
|
||||
authUsers: map[string]struct{}{"a": {}, "b": {}},
|
||||
|
||||
// execCount and sumWarnings
|
||||
execCount: 6,
|
||||
sumWarnings: 16,
|
||||
|
||||
// latency
|
||||
sumLatency: 16,
|
||||
maxLatency: 5,
|
||||
minLatency: 1,
|
||||
sumParseLatency: 6,
|
||||
maxParseLatency: 2,
|
||||
sumCompileLatency: 6,
|
||||
maxCompileLatency: 2,
|
||||
|
||||
// coprocessor
|
||||
sumNumCopTasks: 8,
|
||||
maxCopProcessTime: 15,
|
||||
maxCopProcessAddress: "1.14.5.14",
|
||||
maxCopWaitTime: 4,
|
||||
maxCopWaitAddress: "19.19.8.10",
|
||||
|
||||
// TiKV
|
||||
sumProcessTime: 2,
|
||||
maxProcessTime: 1,
|
||||
sumWaitTime: 4,
|
||||
maxWaitTime: 1,
|
||||
sumBackoffTime: 4,
|
||||
maxBackoffTime: 2,
|
||||
|
||||
sumTotalKeys: 6,
|
||||
maxTotalKeys: 2,
|
||||
sumProcessedKeys: 16,
|
||||
maxProcessedKeys: 4,
|
||||
sumRocksdbDeleteSkippedCount: 16,
|
||||
maxRocksdbDeleteSkippedCount: 2,
|
||||
|
||||
sumRocksdbKeySkippedCount: 16,
|
||||
maxRocksdbKeySkippedCount: 3,
|
||||
sumRocksdbBlockCacheHitCount: 16,
|
||||
maxRocksdbBlockCacheHitCount: 3,
|
||||
sumRocksdbBlockReadCount: 6,
|
||||
maxRocksdbBlockReadCount: 3,
|
||||
sumRocksdbBlockReadByte: 8,
|
||||
maxRocksdbBlockReadByte: 4,
|
||||
|
||||
// txn
|
||||
commitCount: 16,
|
||||
sumPrewriteTime: 6,
|
||||
maxPrewriteTime: 3,
|
||||
sumCommitTime: 16,
|
||||
maxCommitTime: 5,
|
||||
sumGetCommitTsTime: 16,
|
||||
maxGetCommitTsTime: 8,
|
||||
sumCommitBackoffTime: 16,
|
||||
maxCommitBackoffTime: 8,
|
||||
|
||||
sumResolveLockTime: 16,
|
||||
maxResolveLockTime: 8,
|
||||
sumLocalLatchTime: 16,
|
||||
maxLocalLatchTime: 8,
|
||||
sumWriteKeys: 16,
|
||||
maxWriteKeys: 8,
|
||||
sumWriteSize: 16,
|
||||
maxWriteSize: 8,
|
||||
sumPrewriteRegionNum: 16,
|
||||
maxPrewriteRegionNum: 8,
|
||||
sumTxnRetry: 16,
|
||||
maxTxnRetry: 8,
|
||||
sumBackoffTimes: 16,
|
||||
backoffTypes: map[string]int{},
|
||||
|
||||
// plan cache
|
||||
planCacheHits: 16,
|
||||
|
||||
// other
|
||||
sumAffectedRows: 16,
|
||||
sumMem: 16,
|
||||
maxMem: 8,
|
||||
sumDisk: 16,
|
||||
maxDisk: 8,
|
||||
firstSeen: time.Unix(now-20, 0),
|
||||
lastSeen: time.Unix(now, 0),
|
||||
execRetryCount: 16,
|
||||
execRetryTime: 16,
|
||||
sumKVTotal: 4,
|
||||
sumPDTotal: 4,
|
||||
sumBackoffTotal: 4,
|
||||
sumWriteSQLRespTotal: 200,
|
||||
sumErrors: 16,
|
||||
}
|
||||
c.Assert(reflect.DeepEqual(&addTo, &expectedSum), Equals, true)
|
||||
}
|
||||
|
||||
func getAllEvicted(ssdbe *stmtSummaryByDigestEvicted) string {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
for e := ssdbe.history.Back(); e != nil; e = e.Prev() {
|
||||
|
||||
@ -340,6 +340,7 @@ func (ssMap *stmtSummaryByDigestMap) ToCurrentDatum(user *auth.UserIdentity, isS
|
||||
ssMap.Lock()
|
||||
values := ssMap.summaryMap.Values()
|
||||
beginTime := ssMap.beginTimeForCurInterval
|
||||
other := ssMap.other
|
||||
ssMap.Unlock()
|
||||
|
||||
rows := make([][]types.Datum, 0, len(values))
|
||||
@ -349,21 +350,29 @@ func (ssMap *stmtSummaryByDigestMap) ToCurrentDatum(user *auth.UserIdentity, isS
|
||||
rows = append(rows, record)
|
||||
}
|
||||
}
|
||||
if otherDatum := other.toCurrentDatum(); otherDatum != nil {
|
||||
rows = append(rows, otherDatum)
|
||||
}
|
||||
return rows
|
||||
}
|
||||
|
||||
// ToHistoryDatum converts history statements summaries to datum.
|
||||
func (ssMap *stmtSummaryByDigestMap) ToHistoryDatum(user *auth.UserIdentity, isSuper bool) [][]types.Datum {
|
||||
historySize := ssMap.historySize()
|
||||
|
||||
ssMap.Lock()
|
||||
values := ssMap.summaryMap.Values()
|
||||
other := ssMap.other
|
||||
ssMap.Unlock()
|
||||
|
||||
historySize := ssMap.historySize()
|
||||
rows := make([][]types.Datum, 0, len(values)*historySize)
|
||||
for _, value := range values {
|
||||
records := value.(*stmtSummaryByDigest).toHistoryDatum(historySize, user, isSuper)
|
||||
rows = append(rows, records...)
|
||||
}
|
||||
|
||||
otherDatum := other.toHistoryDatum(historySize)
|
||||
rows = append(rows, otherDatum...)
|
||||
return rows
|
||||
}
|
||||
|
||||
@ -884,8 +893,9 @@ func (ssElement *stmtSummaryByDigestElement) toDatum(ssbd *stmtSummaryByDigest)
|
||||
types.NewTime(types.FromGoTime(time.Unix(ssElement.beginTime, 0)), mysql.TypeTimestamp, 0),
|
||||
types.NewTime(types.FromGoTime(time.Unix(ssElement.endTime, 0)), mysql.TypeTimestamp, 0),
|
||||
ssbd.stmtType,
|
||||
ssbd.schemaName,
|
||||
ssbd.digest,
|
||||
// This behaviour follow MySQL. see more in https://dev.mysql.com/doc/refman/5.7/en/performance-schema-statement-digests.html
|
||||
convertEmptyToNil(ssbd.schemaName),
|
||||
convertEmptyToNil(ssbd.digest),
|
||||
ssbd.normalizedSQL,
|
||||
convertEmptyToNil(ssbd.tableNames),
|
||||
convertEmptyToNil(strings.Join(ssElement.indexNames, ",")),
|
||||
|
||||
@ -671,6 +671,52 @@ func (s *testStmtSummarySuite) TestToDatum(c *C) {
|
||||
datums = s.ssMap.ToHistoryDatum(nil, true)
|
||||
c.Assert(len(datums), Equals, 1)
|
||||
match(c, datums[0], expectedDatum...)
|
||||
|
||||
// test evict
|
||||
err := s.ssMap.SetMaxStmtCount("1", false)
|
||||
defer func() {
|
||||
// clean up
|
||||
err = s.ssMap.SetMaxStmtCount("", false)
|
||||
c.Assert(err, IsNil)
|
||||
}()
|
||||
|
||||
c.Assert(err, IsNil)
|
||||
stmtExecInfo2 := stmtExecInfo1
|
||||
stmtExecInfo2.Digest = "bandit sei"
|
||||
s.ssMap.AddStatement(stmtExecInfo2)
|
||||
c.Assert(s.ssMap.summaryMap.Size(), Equals, 1)
|
||||
datums = s.ssMap.ToCurrentDatum(nil, true)
|
||||
expectedEvictedDatum := []interface{}{n, e, "", "<nil>", "<nil>", "",
|
||||
"<nil>", "<nil>", stmtExecInfo1.User, 1, 0, 0, int64(stmtExecInfo1.TotalLatency),
|
||||
int64(stmtExecInfo1.TotalLatency), int64(stmtExecInfo1.TotalLatency), int64(stmtExecInfo1.TotalLatency),
|
||||
int64(stmtExecInfo1.ParseLatency), int64(stmtExecInfo1.ParseLatency), int64(stmtExecInfo1.CompileLatency),
|
||||
int64(stmtExecInfo1.CompileLatency), stmtExecInfo1.CopTasks.NumCopTasks, int64(stmtExecInfo1.CopTasks.MaxProcessTime),
|
||||
stmtExecInfo1.CopTasks.MaxProcessAddress, int64(stmtExecInfo1.CopTasks.MaxWaitTime),
|
||||
stmtExecInfo1.CopTasks.MaxWaitAddress, int64(stmtExecInfo1.ExecDetail.TimeDetail.ProcessTime), int64(stmtExecInfo1.ExecDetail.TimeDetail.ProcessTime),
|
||||
int64(stmtExecInfo1.ExecDetail.TimeDetail.WaitTime), int64(stmtExecInfo1.ExecDetail.TimeDetail.WaitTime), int64(stmtExecInfo1.ExecDetail.BackoffTime),
|
||||
int64(stmtExecInfo1.ExecDetail.BackoffTime), stmtExecInfo1.ExecDetail.ScanDetail.TotalKeys, stmtExecInfo1.ExecDetail.ScanDetail.TotalKeys,
|
||||
stmtExecInfo1.ExecDetail.ScanDetail.ProcessedKeys, stmtExecInfo1.ExecDetail.ScanDetail.ProcessedKeys,
|
||||
int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbDeleteSkippedCount), int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbDeleteSkippedCount),
|
||||
int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbKeySkippedCount), int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbKeySkippedCount),
|
||||
int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbBlockCacheHitCount), int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbBlockCacheHitCount),
|
||||
int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbBlockReadCount), int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbBlockReadCount),
|
||||
int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbBlockReadByte), int64(stmtExecInfo1.ExecDetail.ScanDetail.RocksdbBlockReadByte),
|
||||
int64(stmtExecInfo1.ExecDetail.CommitDetail.PrewriteTime), int64(stmtExecInfo1.ExecDetail.CommitDetail.PrewriteTime),
|
||||
int64(stmtExecInfo1.ExecDetail.CommitDetail.CommitTime), int64(stmtExecInfo1.ExecDetail.CommitDetail.CommitTime),
|
||||
int64(stmtExecInfo1.ExecDetail.CommitDetail.GetCommitTsTime), int64(stmtExecInfo1.ExecDetail.CommitDetail.GetCommitTsTime),
|
||||
stmtExecInfo1.ExecDetail.CommitDetail.Mu.CommitBackoffTime, stmtExecInfo1.ExecDetail.CommitDetail.Mu.CommitBackoffTime,
|
||||
stmtExecInfo1.ExecDetail.CommitDetail.ResolveLockTime, stmtExecInfo1.ExecDetail.CommitDetail.ResolveLockTime,
|
||||
int64(stmtExecInfo1.ExecDetail.CommitDetail.LocalLatchTime), int64(stmtExecInfo1.ExecDetail.CommitDetail.LocalLatchTime),
|
||||
stmtExecInfo1.ExecDetail.CommitDetail.WriteKeys, stmtExecInfo1.ExecDetail.CommitDetail.WriteKeys,
|
||||
stmtExecInfo1.ExecDetail.CommitDetail.WriteSize, stmtExecInfo1.ExecDetail.CommitDetail.WriteSize,
|
||||
stmtExecInfo1.ExecDetail.CommitDetail.PrewriteRegionNum, stmtExecInfo1.ExecDetail.CommitDetail.PrewriteRegionNum,
|
||||
stmtExecInfo1.ExecDetail.CommitDetail.TxnRetry, stmtExecInfo1.ExecDetail.CommitDetail.TxnRetry, 0, 0, 1,
|
||||
fmt.Sprintf("%s:1", boTxnLockName), stmtExecInfo1.MemMax, stmtExecInfo1.MemMax, stmtExecInfo1.DiskMax, stmtExecInfo1.DiskMax,
|
||||
0, 0, 0, 0, 0, stmtExecInfo1.StmtCtx.AffectedRows(),
|
||||
t, t, 0, 0, 0, "", "", "", ""}
|
||||
expectedDatum[4] = stmtExecInfo2.Digest
|
||||
match(c, datums[0], expectedDatum...)
|
||||
match(c, datums[1], expectedEvictedDatum...)
|
||||
}
|
||||
|
||||
// Test AddStatement and ToDatum parallel.
|
||||
@ -849,7 +895,8 @@ func (s *testStmtSummarySuite) TestSetMaxStmtCountParallel(c *C) {
|
||||
wg.Wait()
|
||||
|
||||
datums := s.ssMap.ToCurrentDatum(nil, true)
|
||||
c.Assert(len(datums), Equals, 1)
|
||||
// due to evictions happened in cache, an additional record will be appended to the table.
|
||||
c.Assert(len(datums), Equals, 2)
|
||||
}
|
||||
|
||||
// Test setting EnableStmtSummary to 0.
|
||||
@ -1071,6 +1118,32 @@ func (s *testStmtSummarySuite) TestSummaryHistory(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
datum = s.ssMap.ToHistoryDatum(nil, true)
|
||||
c.Assert(len(datum), Equals, 5)
|
||||
|
||||
// test eviction
|
||||
s.ssMap.Clear()
|
||||
err = s.ssMap.SetMaxStmtCount("1", false)
|
||||
c.Assert(err, IsNil)
|
||||
defer func() {
|
||||
err := s.ssMap.SetMaxStmtCount("", false)
|
||||
c.Assert(err, IsNil)
|
||||
}()
|
||||
// insert first digest
|
||||
for i := 0; i < 6; i++ {
|
||||
s.ssMap.beginTimeForCurInterval = now + int64(i)*10
|
||||
s.ssMap.AddStatement(stmtExecInfo1)
|
||||
c.Assert(s.ssMap.summaryMap.Size(), Equals, 1)
|
||||
c.Assert(s.ssMap.other.history.Len(), Equals, 0)
|
||||
}
|
||||
// insert another digest to evict it
|
||||
stmtExecInfo2 := stmtExecInfo1
|
||||
stmtExecInfo2.Digest = "bandit digest"
|
||||
s.ssMap.AddStatement(stmtExecInfo2)
|
||||
c.Assert(s.ssMap.summaryMap.Size(), Equals, 1)
|
||||
// length of `other` should not longer than historySize.
|
||||
c.Assert(s.ssMap.other.history.Len(), Equals, 5)
|
||||
datum = s.ssMap.ToHistoryDatum(nil, true)
|
||||
// length of STATEMENT_SUMMARY_HISTORY == (history in cache) + (history evicted)
|
||||
c.Assert(len(datum), Equals, 6)
|
||||
}
|
||||
|
||||
// Test summary when PrevSQL is not empty.
|
||||
|
||||
Reference in New Issue
Block a user