Files
tidb/pkg/executor/stmtsummary.go

421 lines
11 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/set"
"github.com/pingcap/tidb/pkg/util/stmtsummary"
stmtsummaryv2 "github.com/pingcap/tidb/pkg/util/stmtsummary/v2"
)
const (
defaultRetrieveCount = 1024
)
func buildStmtSummaryRetriever(
table *model.TableInfo,
columns []*model.ColumnInfo,
extractor *plannercore.StatementsSummaryExtractor,
) memTableRetriever {
if extractor == nil {
extractor = &plannercore.StatementsSummaryExtractor{}
}
if extractor.Digests.Empty() {
extractor.Digests = nil
}
var retriever memTableRetriever
if extractor.SkipRequest {
retriever = &dummyRetriever{}
} else if config.GetGlobalConfig().Instance.StmtSummaryEnablePersistent {
retriever = &stmtSummaryRetrieverV2{
stmtSummary: stmtsummaryv2.GlobalStmtSummary,
table: table,
columns: columns,
digests: extractor.Digests,
timeRanges: buildTimeRanges(extractor.CoarseTimeRange),
}
} else {
retriever = &stmtSummaryRetriever{
table: table,
columns: columns,
digests: extractor.Digests,
}
}
return retriever
}
type dummyRetriever struct {
dummyCloser
}
func (*dummyRetriever) retrieve(_ context.Context, _ sessionctx.Context) ([][]types.Datum, error) {
return nil, nil
}
// stmtSummaryRetriever is used to retrieve statements summary.
type stmtSummaryRetriever struct {
table *model.TableInfo
columns []*model.ColumnInfo
digests set.StringSet
// lazily initialized
rowsReader *rowsReader
}
func (e *stmtSummaryRetriever) retrieve(_ context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if err := e.ensureRowsReader(sctx); err != nil {
return nil, err
}
return e.rowsReader.read(defaultRetrieveCount)
}
func (e *stmtSummaryRetriever) close() error {
if e.rowsReader != nil {
return e.rowsReader.close()
}
return nil
}
func (*stmtSummaryRetriever) getRuntimeStats() execdetails.RuntimeStats {
return nil
}
func (e *stmtSummaryRetriever) ensureRowsReader(sctx sessionctx.Context) error {
if e.rowsReader != nil {
return nil
}
var err error
if isEvictedTable(e.table.Name.O) {
e.rowsReader, err = e.initEvictedRowsReader(sctx)
} else {
e.rowsReader, err = e.initSummaryRowsReader(sctx)
}
return err
}
func (e *stmtSummaryRetriever) initEvictedRowsReader(sctx sessionctx.Context) (*rowsReader, error) {
if err := checkPrivilege(sctx); err != nil {
return nil, err
}
rows := stmtsummary.StmtSummaryByDigestMap.ToEvictedCountDatum()
if !isClusterTable(e.table.Name.O) {
// rows are full-columned, so we need to adjust them to the required columns.
return newSimpleRowsReader(adjustColumns(rows, e.columns, e.table)), nil
}
// Additional column `INSTANCE` for cluster table
rows, err := infoschema.AppendHostInfoToRows(sctx, rows)
if err != nil {
return nil, err
}
// rows are full-columned, so we need to adjust them to the required columns.
return newSimpleRowsReader(adjustColumns(rows, e.columns, e.table)), nil
}
func (e *stmtSummaryRetriever) initSummaryRowsReader(sctx sessionctx.Context) (*rowsReader, error) {
vars := sctx.GetSessionVars()
user := vars.User
tz := vars.StmtCtx.TimeZone()
columns := e.columns
priv := hasPriv(sctx, mysql.ProcessPriv)
instanceAddr, err := clusterTableInstanceAddr(sctx, e.table.Name.O)
if err != nil {
return nil, err
}
reader := stmtsummary.NewStmtSummaryReader(user, priv, columns, instanceAddr, tz)
if e.digests != nil {
// set checker to filter out statements not matching the given digests
checker := stmtsummary.NewStmtSummaryChecker(e.digests)
reader.SetChecker(checker)
}
var rows [][]types.Datum
if isCumulativeTable(e.table.Name.O) {
rows = reader.GetStmtSummaryCumulativeRows()
} else if isCurrentTable(e.table.Name.O) {
rows = reader.GetStmtSummaryCurrentRows()
} else if isHistoryTable(e.table.Name.O) {
rows = reader.GetStmtSummaryHistoryRows()
}
return newSimpleRowsReader(rows), nil
}
// stmtSummaryRetriever is used to retrieve statements summary when
// config.GetGlobalConfig().Instance.StmtSummaryEnablePersistent is true
type stmtSummaryRetrieverV2 struct {
stmtSummary *stmtsummaryv2.StmtSummary
table *model.TableInfo
columns []*model.ColumnInfo
digests set.StringSet
timeRanges []*stmtsummaryv2.StmtTimeRange
// lazily initialized
rowsReader *rowsReader
}
func (r *stmtSummaryRetrieverV2) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if err := r.ensureRowsReader(ctx, sctx); err != nil {
return nil, err
}
return r.rowsReader.read(defaultRetrieveCount)
}
func (r *stmtSummaryRetrieverV2) close() error {
if r.rowsReader != nil {
return r.rowsReader.close()
}
return nil
}
func (*stmtSummaryRetrieverV2) getRuntimeStats() execdetails.RuntimeStats {
return nil
}
func (r *stmtSummaryRetrieverV2) ensureRowsReader(ctx context.Context, sctx sessionctx.Context) error {
if r.rowsReader != nil {
return nil
}
var err error
if isEvictedTable(r.table.Name.O) {
r.rowsReader, err = r.initEvictedRowsReader(sctx)
} else {
r.rowsReader, err = r.initSummaryRowsReader(ctx, sctx)
}
return err
}
func (r *stmtSummaryRetrieverV2) initEvictedRowsReader(sctx sessionctx.Context) (*rowsReader, error) {
if err := checkPrivilege(sctx); err != nil {
return nil, err
}
var rows [][]types.Datum
row := r.stmtSummary.Evicted()
if row != nil {
rows = append(rows, row)
}
if !isClusterTable(r.table.Name.O) {
// rows are full-columned, so we need to adjust them to the required columns.
return newSimpleRowsReader(adjustColumns(rows, r.columns, r.table)), nil
}
// Additional column `INSTANCE` for cluster table
rows, err := infoschema.AppendHostInfoToRows(sctx, rows)
if err != nil {
return nil, err
}
// rows are full-columned, so we need to adjust them to the required columns.
return newSimpleRowsReader(adjustColumns(rows, r.columns, r.table)), nil
}
func (r *stmtSummaryRetrieverV2) initSummaryRowsReader(ctx context.Context, sctx sessionctx.Context) (*rowsReader, error) {
vars := sctx.GetSessionVars()
user := vars.User
tz := vars.StmtCtx.TimeZone()
stmtSummary := r.stmtSummary
columns := r.columns
timeRanges := r.timeRanges
digests := r.digests
priv := hasPriv(sctx, mysql.ProcessPriv)
instanceAddr, err := clusterTableInstanceAddr(sctx, r.table.Name.O)
if err != nil {
return nil, err
}
mem := stmtsummaryv2.NewMemReader(stmtSummary, columns, instanceAddr, tz, user, priv, digests, timeRanges)
memRows := mem.Rows()
var rowsReader *rowsReader
if isCurrentTable(r.table.Name.O) {
rowsReader = newSimpleRowsReader(memRows)
}
if isHistoryTable(r.table.Name.O) {
// history table should return all rows including mem and disk
concurrent := sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency()
history, err := stmtsummaryv2.NewHistoryReader(ctx, columns, instanceAddr, tz, user, priv, digests, timeRanges, concurrent)
if err != nil {
return nil, err
}
rowsReader = newRowsReader(memRows, history)
}
return rowsReader, nil
}
type rowsPuller interface {
Closeable
Rows() ([][]types.Datum, error)
}
type rowsReader struct {
puller rowsPuller
rows [][]types.Datum
}
func newSimpleRowsReader(rows [][]types.Datum) *rowsReader {
return &rowsReader{rows: rows}
}
func newRowsReader(rows [][]types.Datum, puller rowsPuller) *rowsReader {
return &rowsReader{puller: puller, rows: rows}
}
func (r *rowsReader) read(maxCount int) ([][]types.Datum, error) {
if err := r.pull(); err != nil {
return nil, err
}
if maxCount >= len(r.rows) {
ret := r.rows
r.rows = nil
return ret, nil
}
ret := r.rows[:maxCount]
r.rows = r.rows[maxCount:]
return ret, nil
}
func (r *rowsReader) pull() error {
if r.puller == nil {
return nil
}
// there are remaining rows
if len(r.rows) > 0 {
return nil
}
rows, err := r.puller.Rows()
if err != nil {
return err
}
// pulled new rows from the puller
if len(rows) != 0 {
r.rows = rows
return nil
}
// reach the end of the puller
err = r.puller.Close()
if err != nil {
return err
}
r.puller = nil
return nil
}
func (r *rowsReader) close() error {
if r.puller != nil {
return r.puller.Close()
}
return nil
}
func isClusterTable(originalTableName string) bool {
switch originalTableName {
case infoschema.ClusterTableStatementsSummary,
infoschema.ClusterTableStatementsSummaryHistory,
infoschema.ClusterTableStatementsSummaryEvicted,
infoschema.ClusterTableTiDBStatementsStats:
return true
}
return false
}
func isCumulativeTable(originalTableName string) bool {
switch originalTableName {
case infoschema.TableTiDBStatementsStats,
infoschema.ClusterTableTiDBStatementsStats:
return true
}
return false
}
func isCurrentTable(originalTableName string) bool {
switch originalTableName {
case infoschema.TableStatementsSummary,
infoschema.ClusterTableStatementsSummary:
return true
}
return false
}
func isHistoryTable(originalTableName string) bool {
switch originalTableName {
case infoschema.TableStatementsSummaryHistory,
infoschema.ClusterTableStatementsSummaryHistory:
return true
}
return false
}
func isEvictedTable(originalTableName string) bool {
switch originalTableName {
case infoschema.TableStatementsSummaryEvicted,
infoschema.ClusterTableStatementsSummaryEvicted:
return true
}
return false
}
func checkPrivilege(sctx sessionctx.Context) error {
if !hasPriv(sctx, mysql.ProcessPriv) {
return plannererrors.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
return nil
}
func clusterTableInstanceAddr(sctx sessionctx.Context, originalTableName string) (string, error) {
if isClusterTable(originalTableName) {
return infoschema.GetInstanceAddr(sctx)
}
return "", nil
}
func buildTimeRanges(tr *plannercore.TimeRange) []*stmtsummaryv2.StmtTimeRange {
if tr == nil {
return nil
}
return []*stmtsummaryv2.StmtTimeRange{{
Begin: tr.StartTime.Unix(),
End: tr.EndTime.Unix(),
}}
}