planner: unify ways to execute SQLs in stats_handle package (#47324)

ref pingcap/tidb#46905
This commit is contained in:
Yuanjia Zhang
2023-09-27 17:31:46 +08:00
committed by GitHub
parent 257278d064
commit da5cfbc5f8
18 changed files with 230 additions and 316 deletions

View File

@ -117,13 +117,13 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
case infoschema.TableStatistics:
e.setDataForStatistics(sctx, dbs)
case infoschema.TableTables:
err = e.setDataFromTables(ctx, sctx, dbs)
err = e.setDataFromTables(sctx, dbs)
case infoschema.TableReferConst:
err = e.setDataFromReferConst(sctx, dbs)
case infoschema.TableSequences:
e.setDataFromSequences(sctx, dbs)
case infoschema.TablePartitions:
err = e.setDataFromPartitions(ctx, sctx, dbs)
err = e.setDataFromPartitions(sctx, dbs)
case infoschema.TableClusterInfo:
err = e.dataForTiDBClusterInfo(sctx)
case infoschema.TableAnalyzeStatus:
@ -487,8 +487,8 @@ func (e *memtableRetriever) setDataFromReferConst(sctx sessionctx.Context, schem
return nil
}
func (e *memtableRetriever) setDataFromTables(ctx context.Context, sctx sessionctx.Context, schemas []*model.DBInfo) error {
err := cache.TableRowStatsCache.Update(ctx, sctx)
func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas []*model.DBInfo) error {
err := cache.TableRowStatsCache.Update(sctx)
if err != nil {
return err
}
@ -899,9 +899,9 @@ func calcCharOctLength(lenInChar int, cs string) int {
return lenInBytes
}
func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sessionctx.Context, schemas []*model.DBInfo) error {
func (e *memtableRetriever) setDataFromPartitions(sctx sessionctx.Context, schemas []*model.DBInfo) error {
cache := cache.TableRowStatsCache
err := cache.Update(ctx, sctx)
err := cache.Update(sctx)
if err != nil {
return err
}

View File

@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//infoschema",
"//kv",
"//metrics",
"//parser/ast",
"//parser/model",

View File

@ -15,7 +15,6 @@
package autoanalyze
import (
"context"
"fmt"
"math"
"math/rand"
@ -25,7 +24,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
@ -64,10 +62,9 @@ func parseAnalyzePeriod(start, end string) (time.Time, time.Time, error) {
return s, e, err
}
func getAutoAnalyzeParameters(exec sqlexec.RestrictedSQLExecutor) map[string]string {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
func getAutoAnalyzeParameters(sctx sessionctx.Context) map[string]string {
sql := "select variable_name, variable_value from mysql.global_variables where variable_name in (%?, %?, %?)"
rows, _, err := exec.ExecRestrictedSQL(ctx, nil, sql, variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime)
rows, _, err := statsutil.ExecWithOpts(sctx, nil, sql, variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime)
if err != nil {
return map[string]string{}
}
@ -103,9 +100,8 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
logutil.BgLogger().Error("HandleAutoAnalyze panicked", zap.Any("error", r), zap.Stack("stack"))
}
}()
exec := sctx.(sqlexec.RestrictedSQLExecutor)
dbs := is.AllSchemaNames()
parameters := getAutoAnalyzeParameters(exec)
parameters := getAutoAnalyzeParameters(sctx)
autoAnalyzeRatio := parseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
start, end, err := parseAnalyzePeriod(parameters[variable.TiDBAutoAnalyzeStartTime], parameters[variable.TiDBAutoAnalyzeEndTime])
if err != nil {
@ -168,7 +164,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
if pi == nil {
statsTbl := opt.GetTableStats(tblInfo)
sql := "analyze table %n.%n"
analyzed := autoAnalyzeTable(sctx, exec, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O)
analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O)
if analyzed {
// analyze one table at a time to let it get the freshest parameters.
// others will be analyzed next round which is just 3s later.
@ -184,7 +180,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
}
}
if pruneMode == variable.Dynamic {
analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, exec, opt, tblInfo, partitionDefs, db, autoAnalyzeRatio)
analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, opt, tblInfo, partitionDefs, db, autoAnalyzeRatio)
if analyzed {
return true
}
@ -193,7 +189,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
for _, def := range partitionDefs {
sql := "analyze table %n.%n partition %n"
statsTbl := opt.GetPartitionStats(tblInfo, def.ID)
analyzed := autoAnalyzeTable(sctx, exec, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O)
analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O)
if analyzed {
return true
}
@ -207,7 +203,6 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
var AutoAnalyzeMinCnt int64 = 1000
func autoAnalyzeTable(sctx sessionctx.Context,
exec sqlexec.RestrictedSQLExecutor,
opt *Opt,
tblInfo *model.TableInfo, statsTbl *statistics.Table,
ratio float64, sql string, params ...interface{}) bool {
@ -222,7 +217,7 @@ func autoAnalyzeTable(sctx sessionctx.Context,
logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"), zap.String("sql", escaped), zap.String("reason", reason))
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer)
execAutoAnalyze(sctx, exec, opt, tableStatsVer, sql, params...)
execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...)
return true
}
for _, idx := range tblInfo.Indices {
@ -236,7 +231,7 @@ func autoAnalyzeTable(sctx sessionctx.Context,
logutil.BgLogger().Info("auto analyze for unanalyzed", zap.String("category", "stats"), zap.String("sql", escaped))
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer)
execAutoAnalyze(sctx, exec, opt, tableStatsVer, sqlWithIdx, paramsWithIdx...)
execAutoAnalyze(sctx, opt, tableStatsVer, sqlWithIdx, paramsWithIdx...)
return true
}
}
@ -288,7 +283,6 @@ func TableAnalyzed(tbl *statistics.Table) bool {
}
func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
exec sqlexec.RestrictedSQLExecutor,
opt *Opt,
tblInfo *model.TableInfo, partitionDefs []model.PartitionDefinition,
db string, ratio float64) bool {
@ -335,7 +329,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"),
zap.String("table", tblInfo.Name.String()),
zap.Any("partitions", partitionNames[start:end]))
execAutoAnalyze(sctx, exec, opt, tableStatsVer, sql, params...)
execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...)
}
return true
}
@ -366,7 +360,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
zap.String("table", tblInfo.Name.String()),
zap.String("index", idx.Name.String()),
zap.Any("partitions", partitionNames[start:end]))
execAutoAnalyze(sctx, exec, opt, tableStatsVer, sql, params...)
execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...)
}
return true
}
@ -381,12 +375,11 @@ var execOptionForAnalyze = map[int]sqlexec.OptionFuncAlias{
}
func execAutoAnalyze(sctx sessionctx.Context,
exec sqlexec.RestrictedSQLExecutor,
opt *Opt,
statsVer int,
sql string, params ...interface{}) {
startTime := time.Now()
_, _, err := execRestrictedSQLWithStatsVer(sctx, exec, opt, statsVer, sql, params...)
_, _, err := execAnalyzeStmt(sctx, opt, statsVer, sql, params...)
dur := time.Since(startTime)
metrics.AutoAnalyzeHistogram.Observe(dur.Seconds())
if err != nil {
@ -401,12 +394,10 @@ func execAutoAnalyze(sctx sessionctx.Context,
}
}
func execRestrictedSQLWithStatsVer(sctx sessionctx.Context,
exec sqlexec.RestrictedSQLExecutor,
func execAnalyzeStmt(sctx sessionctx.Context,
opt *Opt,
statsVer int,
sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
ctx := statsutil.StatsCtx(context.Background())
pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load()
analyzeSnapshot := sctx.GetSessionVars().EnableAnalyzeSnapshot
optFuncs := []sqlexec.OptionFuncAlias{
@ -416,5 +407,5 @@ func execRestrictedSQLWithStatsVer(sctx sessionctx.Context,
sqlexec.ExecOptionUseCurSession,
sqlexec.ExecOptionWithSysProcTrack(opt.AutoAnalyzeProcIDGetter(), opt.SysProcTracker.Track, opt.SysProcTracker.UnTrack),
}
return exec.ExecRestrictedSQL(ctx, optFuncs, sql, params...)
return statsutil.ExecWithOpts(sctx, optFuncs, sql, params...)
}

View File

@ -29,10 +29,10 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
@ -66,7 +66,7 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *cache.Stat
func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (*cache.StatsCache, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return nil, errors.Trace(err)
}
@ -234,7 +234,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *cach
func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache *cache.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
@ -257,7 +257,7 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache *cache.
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *cache.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
@ -306,7 +306,7 @@ func (*Handle) initStatsTopN4Chunk(cache *cache.StatsCache, iter *chunk.Iterator
func (h *Handle) initStatsTopN(cache *cache.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
@ -356,7 +356,7 @@ func (*Handle) initStatsFMSketch4Chunk(cache *cache.StatsCache, iter *chunk.Iter
func (h *Handle) initStatsFMSketch(cache *cache.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, value from mysql.stats_fm_sketch"
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
@ -429,7 +429,7 @@ func (*Handle) initStatsBuckets4Chunk(cache *cache.StatsCache, iter *chunk.Itera
func (h *Handle) initStatsBuckets(cache *cache.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
@ -468,14 +468,13 @@ func (h *Handle) initStatsBuckets(cache *cache.StatsCache) error {
// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats.
// Column/index stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN.
func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
defer func() {
_, err1 := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "commit")
_, err1 := util.Exec(h.initStatsCtx, "commit")
if err == nil && err1 != nil {
err = err1
}
}()
_, err = h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "begin")
_, err = util.Exec(h.initStatsCtx, "begin")
if err != nil {
return err
}
@ -496,14 +495,13 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
// Column stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN.
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
defer func() {
_, err1 := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "commit")
_, err1 := util.Exec(h.initStatsCtx, "commit")
if err == nil && err1 != nil {
err = err1
}
}()
_, err = h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "begin")
_, err = util.Exec(h.initStatsCtx, "begin")
if err != nil {
return err
}

View File

@ -22,7 +22,6 @@ go_library(
"//types",
"//util/chunk",
"//util/logutil",
"//util/sqlexec",
"//util/syncutil",
"@org_uber_go_zap//:zap",
],

View File

@ -15,7 +15,6 @@
package cache
import (
"context"
"strconv"
"strings"
"sync/atomic"
@ -34,7 +33,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/syncutil"
"go.uber.org/zap"
)
@ -223,20 +221,19 @@ func (c *StatsTableRowCache) GetColLength(id tableHistID) uint64 {
}
// Update tries to update the cache.
func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context) error {
func (c *StatsTableRowCache) Update(sctx sessionctx.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
ctx = util.StatsCtx(ctx)
if time.Since(c.modifyTime) < tableStatsCacheExpiry {
if len(c.dirtyIDs) > 0 {
tableRows, err := getRowCountTables(ctx, sctx, c.dirtyIDs...)
tableRows, err := getRowCountTables(sctx, c.dirtyIDs...)
if err != nil {
return err
}
for id, tr := range tableRows {
c.tableRows[id] = tr
}
colLength, err := getColLengthTables(ctx, sctx, c.dirtyIDs...)
colLength, err := getColLengthTables(sctx, c.dirtyIDs...)
if err != nil {
return err
}
@ -247,11 +244,11 @@ func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context
}
return nil
}
tableRows, err := getRowCountTables(ctx, sctx)
tableRows, err := getRowCountTables(sctx)
if err != nil {
return err
}
colLength, err := getColLengthTables(ctx, sctx)
colLength, err := getColLengthTables(sctx)
if err != nil {
return err
}
@ -262,16 +259,15 @@ func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context
return nil
}
func getRowCountTables(ctx context.Context, sctx sessionctx.Context, tableIDs ...int64) (map[int64]uint64, error) {
exec := sctx.(sqlexec.RestrictedSQLExecutor)
func getRowCountTables(sctx sessionctx.Context, tableIDs ...int64) (map[int64]uint64, error) {
var rows []chunk.Row
var err error
if len(tableIDs) == 0 {
rows, _, err = exec.ExecRestrictedSQL(ctx, nil, "select table_id, count from mysql.stats_meta")
rows, _, err = util.ExecWithOpts(sctx, nil, "select table_id, count from mysql.stats_meta")
} else {
inTblIDs := buildInTableIDsString(tableIDs)
sql := "select table_id, count from mysql.stats_meta where " + inTblIDs
rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql)
rows, _, err = util.ExecWithOpts(sctx, nil, sql)
}
if err != nil {
return nil, err
@ -304,17 +300,16 @@ type tableHistID struct {
histID int64
}
func getColLengthTables(ctx context.Context, sctx sessionctx.Context, tableIDs ...int64) (map[tableHistID]uint64, error) {
exec := sctx.(sqlexec.RestrictedSQLExecutor)
func getColLengthTables(sctx sessionctx.Context, tableIDs ...int64) (map[tableHistID]uint64, error) {
var rows []chunk.Row
var err error
if len(tableIDs) == 0 {
sql := "select table_id, hist_id, tot_col_size from mysql.stats_histograms where is_index = 0"
rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql)
rows, _, err = util.ExecWithOpts(sctx, nil, sql)
} else {
inTblIDs := buildInTableIDsString(tableIDs)
sql := "select table_id, hist_id, tot_col_size from mysql.stats_histograms where is_index = 0 and " + inTblIDs
rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql)
rows, _, err = util.ExecWithOpts(sctx, nil, sql)
}
if err != nil {
return nil, err

View File

@ -132,27 +132,26 @@ func (h *Handle) updateStatsVersion() error {
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
sctx := se.(sessionctx.Context)
ctx := statsutil.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
_, err = statsutil.Exec(sctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = statsutil.FinishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(sctx, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
return errors.Trace(err)
}
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?", startTS); err != nil {
if _, err = statsutil.Exec(sctx, "update mysql.stats_meta set version = %?", startTS); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?", startTS); err != nil {
if _, err = statsutil.Exec(sctx, "update mysql.stats_extended set version = %?", startTS); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set version = %?", startTS); err != nil {
if _, err = statsutil.Exec(sctx, "update mysql.stats_histograms set version = %?", startTS); err != nil {
return err
}
@ -270,17 +269,16 @@ func (h *Handle) changeGlobalStatsID(from, to int64) (err error) {
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := statsutil.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
sctx := se.(sessionctx.Context)
_, err = statsutil.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = statsutil.FinishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(sctx, err)
}()
for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} {
_, err = exec.ExecuteInternal(ctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from)
_, err = statsutil.Exec(sctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from)
if err != nil {
return err
}
@ -327,31 +325,30 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := statsutil.StatsCtx(context.Background())
sctx := se.(sessionctx.Context)
_, err = exec.ExecuteInternal(ctx, "begin")
_, err = statsutil.Exec(sctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = statsutil.FinishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(sctx, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
return errors.Trace(err)
}
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
return err
}
statsVer = startTS
for _, col := range info.Columns {
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
return err
}
}
for _, idx := range info.Indices {
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil {
return err
}
}
@ -372,21 +369,20 @@ func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) {
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := statsutil.StatsCtx(context.Background())
sctx := se.(sessionctx.Context)
_, err = exec.ExecuteInternal(ctx, "begin")
_, err = statsutil.Exec(sctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = statsutil.FinishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(sctx, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
return errors.Trace(err)
}
if _, err := exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil {
if _, err := statsutil.Exec(sctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil {
return err
}
return nil
@ -407,25 +403,24 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
sctx := se.(sessionctx.Context)
ctx := statsutil.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
_, err = statsutil.Exec(sctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = statsutil.FinishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(sctx, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
return errors.Trace(err)
}
sctx := se.(sessionctx.Context)
// First of all, we update the version.
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID)
_, err = statsutil.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID)
if err != nil {
return
}
@ -434,7 +429,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
if sctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs sqlexec.RecordSet
rs, err = exec.ExecuteInternal(ctx, "select count from mysql.stats_meta where table_id = %?", physicalID)
rs, err = statsutil.Exec(sctx, "select count from mysql.stats_meta where table_id = %?", physicalID)
if err != nil {
return
}
@ -453,12 +448,12 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
}
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil {
return err
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil {
return err
}
value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob))
@ -466,7 +461,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
return
}
// There must be only one bucket for this new column and the value is the default value.
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil {
return err
}
}

View File

@ -14,7 +14,6 @@ go_library(
"//statistics/handle/util",
"//util/logutil",
"//util/mathutil",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],

View File

@ -15,7 +15,6 @@
package extstats
import (
"context"
"encoding/json"
"fmt"
"slices"
@ -29,7 +28,6 @@ import (
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
@ -50,7 +48,6 @@ func InsertExtendedStats(sctx sessionctx.Context,
recordHistoricalStatsMeta(tableID, statsVer, StatsMetaHistorySourceExtendedStats)
}
}()
exec := sctx.(sqlexec.RestrictedSQLExecutor)
slices.Sort(colIDs)
bytes, err := json.Marshal(colIDs)
if err != nil {
@ -58,17 +55,15 @@ func InsertExtendedStats(sctx sessionctx.Context,
}
strColIDs := string(bytes)
ctx := util.StatsCtx(context.Background())
sqlExecutor := exec.(sqlexec.SQLExecutor)
_, err = sqlExecutor.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, sqlExecutor, err)
err = util.FinishTransaction(sctx, err)
}()
// No need to use `exec.ExecuteInternal` since we have acquired the lock.
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
rows, _, err := util.ExecRows(sctx, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
if err != nil {
return errors.Trace(err)
}
@ -91,12 +86,12 @@ func InsertExtendedStats(sctx sessionctx.Context,
return errors.Trace(err)
}
// Bump version in `mysql.stats_meta` to trigger stats cache refresh.
if _, err = sqlExecutor.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
if _, err = util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
return err
}
statsVer = version
// Remove the existing 'deleted' records.
if _, err = sqlExecutor.ExecuteInternal(ctx, "DELETE FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID); err != nil {
if _, err = util.Exec(sctx, "DELETE FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID); err != nil {
return err
}
// Remove the cache item, which is necessary for cases like a cluster with 3 tidb instances, e.g, a, b and c.
@ -106,7 +101,7 @@ func InsertExtendedStats(sctx sessionctx.Context,
// next `Update()` to remove the cached item then.
removeExtendedStatsItem(currentCache, updateStatsCache, tableID, statsName)
const sql = "INSERT INTO mysql.stats_extended(name, type, table_id, column_ids, version, status) VALUES (%?, %?, %?, %?, %?, %?)"
if _, err = sqlExecutor.ExecuteInternal(ctx, sql, statsName, tp, tableID, strColIDs, version, statistics.ExtendedStatsInited); err != nil {
if _, err = util.Exec(sctx, sql, statsName, tp, tableID, strColIDs, version, statistics.ExtendedStatsInited); err != nil {
return err
}
return
@ -124,9 +119,7 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
recordHistoricalStatsMeta(tableID, statsVer, StatsMetaHistorySourceExtendedStats)
}
}()
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := util.StatsCtx(context.Background())
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
rows, _, err := util.ExecRows(sctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
if err != nil {
return errors.Trace(err)
}
@ -140,14 +133,12 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID))
}
sqlExec := exec.(sqlexec.SQLExecutor)
_, err = sqlExec.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err1 := util.FinishTransaction(ctx, sqlExec, err)
err1 := util.FinishTransaction(sctx, err)
if err == nil && err1 == nil {
removeExtendedStatsItem(currentCache, updateStatsCache, tableID, statsName)
}
@ -157,11 +148,11 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
if err != nil {
return errors.Trace(err)
}
if _, err = sqlExec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
if _, err = util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
return err
}
statsVer = version
if _, err = sqlExec.ExecuteInternal(ctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE name = %? and table_id = %?", version, statistics.ExtendedStatsDeleted, statsName, tableID); err != nil {
if _, err = util.Exec(sctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE name = %? and table_id = %?", version, statistics.ExtendedStatsDeleted, statsName, tableID); err != nil {
return err
}
return nil
@ -170,10 +161,8 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
func BuildExtendedStats(sctx sessionctx.Context,
tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) {
ctx := util.StatsCtx(context.Background())
exec := sctx.(sqlexec.RestrictedSQLExecutor)
const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)"
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
rows, _, err := util.ExecRows(sctx, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
if err != nil {
return nil, errors.Trace(err)
}
@ -293,15 +282,12 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context,
return nil
}
sqlExec := sctx.(sqlexec.SQLExecutor)
ctx := util.StatsCtx(context.Background())
_, err = sqlExec.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, sqlExec, err)
err = util.FinishTransaction(sctx, err)
}()
version, err := util.GetStartTS(sctx)
if err != nil {
@ -321,12 +307,12 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context,
statsStr = item.StringVals
}
// If isLoad is true, it's INSERT; otherwise, it's UPDATE.
if _, err := sqlExec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil {
if _, err := util.Exec(sctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil {
return err
}
}
if !isLoad {
if _, err := sqlExec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
if _, err := util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
return err
}
statsVer = version

View File

@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
@ -43,7 +44,6 @@ const gcLastTSVarName = "tidb_stats_gc_last_ts"
// For dropped tables, we will first update their version
// so that other tidb could know that table is deleted.
func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) {
ctx := context.Background()
// To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb,
// we only garbage collect version before 10 lease.
lease := mathutil.Max(h.Lease(), ddlLease)
@ -55,7 +55,7 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err
// Get the last gc time.
gcVer := now - offset
lastGC, err := h.GetLastGCTimestamp(ctx)
lastGC, err := h.GetLastGCTimestamp()
if err != nil {
return err
}
@ -63,10 +63,10 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err
if err != nil {
return
}
err = h.writeGCTimestampToKV(ctx, gcVer)
err = h.writeGCTimestampToKV(gcVer)
}()
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version >= %? and version < %?", lastGC, gcVer)
rows, _, err := h.execRows("select table_id from mysql.stats_meta where version >= %? and version < %?", lastGC, gcVer)
if err != nil {
return errors.Trace(err)
}
@ -92,8 +92,8 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err
}
// GetLastGCTimestamp loads the last gc time from mysql.tidb.
func (h *Handle) GetLastGCTimestamp(ctx context.Context) (uint64, error) {
rows, _, err := h.execRestrictedSQL(ctx, "SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name=%?", gcLastTSVarName)
func (h *Handle) GetLastGCTimestamp() (uint64, error) {
rows, _, err := h.execRows("SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name=%?", gcLastTSVarName)
if err != nil {
return 0, errors.Trace(err)
}
@ -108,8 +108,8 @@ func (h *Handle) GetLastGCTimestamp(ctx context.Context) (uint64, error) {
return lastGcTS, nil
}
func (h *Handle) writeGCTimestampToKV(ctx context.Context, newTS uint64) error {
_, _, err := h.execRestrictedSQL(ctx,
func (h *Handle) writeGCTimestampToKV(newTS uint64) error {
_, _, err := h.execRows(
"insert into mysql.tidb (variable_name, variable_value) values (%?, %?) on duplicate key update variable_value = %?",
gcLastTSVarName,
newTS,
@ -119,15 +119,14 @@ func (h *Handle) writeGCTimestampToKV(ctx context.Context, newTS uint64) error {
}
func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error {
ctx := context.Background()
rows, _, err := h.execRestrictedSQL(ctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID)
rows, _, err := h.execRows("select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID)
if err != nil {
return errors.Trace(err)
}
// The table has already been deleted in stats and acknowledged to all tidb,
// we can safely remove the meta info now.
if len(rows) == 0 {
_, _, err = h.execRestrictedSQL(ctx, "delete from mysql.stats_meta where table_id = %?", physicalID)
_, _, err = h.execRows("delete from mysql.stats_meta where table_id = %?", physicalID)
if err != nil {
return errors.Trace(err)
}
@ -164,7 +163,7 @@ func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error
}
}
// Mark records in mysql.stats_extended as `deleted`.
rows, _, err = h.execRestrictedSQL(ctx, "select name, column_ids from mysql.stats_extended where table_id = %? and status in (%?, %?)", physicalID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
rows, _, err = h.execRows("select name, column_ids from mysql.stats_extended where table_id = %? and status in (%?, %?)", physicalID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
if err != nil {
return errors.Trace(err)
}
@ -209,9 +208,9 @@ func (h *Handle) ClearOutdatedHistoryStats() error {
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
sctx := se.(sessionctx.Context)
sql := "select count(*) from mysql.stats_meta_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND"
rs, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
rs, err := util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
if err != nil {
return err
}
@ -226,12 +225,12 @@ func (h *Handle) ClearOutdatedHistoryStats() error {
count := rows[0].GetInt64(0)
if count > 0 {
sql = "delete from mysql.stats_meta_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND"
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
_, err = util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
if err != nil {
return err
}
sql = "delete from mysql.stats_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND"
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
_, err = util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
}
@ -244,23 +243,22 @@ func (h *Handle) gcHistoryStatsFromKV(physicalID int64) error {
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := util.StatsCtx(context.Background())
sctx := se.(sessionctx.Context)
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
sql := "delete from mysql.stats_history where table_id = %?"
_, err = exec.ExecuteInternal(ctx, sql, physicalID)
_, err = util.Exec(sctx, sql, physicalID)
if err != nil {
return errors.Trace(err)
}
sql = "delete from mysql.stats_meta_history where table_id = %?"
_, err = exec.ExecuteInternal(ctx, sql, physicalID)
_, err = util.Exec(sctx, sql, physicalID)
return err
}
@ -271,43 +269,42 @@ func (h *Handle) deleteHistStatsFromKV(physicalID int64, histID int64, isIndex i
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := util.StatsCtx(context.Background())
sctx := se.(sessionctx.Context)
_, err = exec.ExecuteInternal(ctx, "begin")
_, err = util.Exec(sctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
return errors.Trace(err)
}
// First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything.
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil {
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil {
return err
}
// delete histogram meta
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
return err
}
// delete top n data
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
return err
}
// delete all buckets
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
return err
}
// delete all fm sketch
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
if _, err := util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil {
return err
}
if isIndex == 0 {
// delete the record in mysql.column_stats_usage
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.column_stats_usage where table_id = %? and column_id = %?", physicalID, histID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.column_stats_usage where table_id = %? and column_id = %?", physicalID, histID); err != nil {
return err
}
}
@ -322,14 +319,13 @@ func (h *Handle) DeleteTableStatsFromKV(statsIDs []int64) (err error) {
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
sctx := se.(sessionctx.Context)
_, err = util.Exec(sctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
@ -337,31 +333,31 @@ func (h *Handle) DeleteTableStatsFromKV(statsIDs []int64) (err error) {
}
for _, statsID := range statsIDs {
// We only update the version so that other tidb will know that this table is deleted.
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, statsID); err != nil {
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, statsID); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %?", statsID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_histograms where table_id = %?", statsID); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %?", statsID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %?", statsID); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %?", statsID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %?", statsID); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?, status = %? where table_id = %? and status in (%?, %?)", startTS, statistics.ExtendedStatsDeleted, statsID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited); err != nil {
if _, err = util.Exec(sctx, "update mysql.stats_extended set version = %?, status = %? where table_id = %? and status in (%?, %?)", startTS, statistics.ExtendedStatsDeleted, statsID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %?", statsID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %?", statsID); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.column_stats_usage where table_id = %?", statsID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.column_stats_usage where table_id = %?", statsID); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.analyze_options where table_id = %?", statsID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.analyze_options where table_id = %?", statsID); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, lockstats.DeleteLockSQL, statsID); err != nil {
if _, err = util.Exec(sctx, lockstats.DeleteLockSQL, statsID); err != nil {
return err
}
}
@ -374,16 +370,15 @@ func (h *Handle) removeDeletedExtendedStats(version uint64) (err error) {
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
sctx := se.(sessionctx.Context)
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
const sql = "delete from mysql.stats_extended where status = %? and version < %?"
_, err = exec.ExecuteInternal(ctx, sql, statistics.ExtendedStatsDeleted, version)
_, err = util.Exec(sctx, sql, statistics.ExtendedStatsDeleted, version)
return
}

View File

@ -15,7 +15,6 @@
package handle
import (
"context"
"encoding/json"
"fmt"
"math"
@ -106,31 +105,12 @@ type Handle struct {
lease atomic2.Duration
}
func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) {
se, err := h.pool.Get()
if err != nil {
return nil, nil, errors.Trace(err)
}
defer h.pool.Put(se)
exec := se.(sqlexec.RestrictedSQLExecutor)
return fn(ctx, exec)
}
func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
return h.withRestrictedSQLExecutor(util.StatsCtx(ctx), func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) {
return exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, params...)
})
}
func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string, snapshot uint64, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
return h.withRestrictedSQLExecutor(util.StatsCtx(ctx), func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
sqlexec.ExecOptionWithSnapshot(snapshot),
sqlexec.ExecOptionUseCurSession,
}
return exec.ExecRestrictedSQL(ctx, optFuncs, sql, params...)
func (h *Handle) execRows(sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, rerr error) {
_ = h.callWithSCtx(func(sctx sessionctx.Context) error {
rows, fields, rerr = util.ExecRows(sctx, sql, args...)
return nil
})
return
}
// Clear the statsCache, only for test.
@ -243,8 +223,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
} else {
lastVersion = 0
}
ctx := util.StatsCtx(context.Background())
rows, _, err := h.execRestrictedSQL(ctx, "SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion)
rows, _, err := h.execRows("SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion)
if err != nil {
return errors.Trace(err)
}
@ -680,8 +659,7 @@ type colStatsTimeInfo struct {
// getDisableColumnTrackingTime reads the value of tidb_disable_column_tracking_time from mysql.tidb if it exists.
func (h *Handle) getDisableColumnTrackingTime() (*time.Time, error) {
ctx := util.StatsCtx(context.Background())
rows, fields, err := h.execRestrictedSQL(ctx, "SELECT variable_value FROM %n.%n WHERE variable_name = %?", mysql.SystemDB, mysql.TiDBTable, variable.TiDBDisableColumnTrackingTime)
rows, fields, err := h.execRows("SELECT variable_value FROM %n.%n WHERE variable_name = %?", mysql.SystemDB, mysql.TiDBTable, variable.TiDBDisableColumnTrackingTime)
if err != nil {
return nil, err
}
@ -707,9 +685,8 @@ func (h *Handle) LoadColumnStatsUsage(loc *time.Location) (map[model.TableItemID
if err != nil {
return nil, errors.Trace(err)
}
ctx := util.StatsCtx(context.Background())
// Since we use another session from session pool to read mysql.column_stats_usage, which may have different @@time_zone, so we do time zone conversion here.
rows, _, err := h.execRestrictedSQL(ctx, "SELECT table_id, column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00'), CONVERT_TZ(last_analyzed_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage")
rows, _, err := h.execRows("SELECT table_id, column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00'), CONVERT_TZ(last_analyzed_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage")
if err != nil {
return nil, errors.Trace(err)
}
@ -747,9 +724,8 @@ func (h *Handle) LoadColumnStatsUsage(loc *time.Location) (map[model.TableItemID
// CollectColumnsInExtendedStats returns IDs of the columns involved in extended stats.
func (h *Handle) CollectColumnsInExtendedStats(tableID int64) ([]int64, error) {
ctx := util.StatsCtx(context.Background())
const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)"
rows, _, err := h.execRestrictedSQL(ctx, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
rows, _, err := h.execRows(sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
if err != nil {
return nil, errors.Trace(err)
}
@ -776,8 +752,7 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) {
if err != nil {
return nil, errors.Trace(err)
}
ctx := util.StatsCtx(context.Background())
rows, _, err := h.execRestrictedSQL(ctx, "SELECT column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage WHERE table_id = %? AND last_used_at IS NOT NULL", tableID)
rows, _, err := h.execRows("SELECT column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage WHERE table_id = %? AND last_used_at IS NOT NULL", tableID)
if err != nil {
return nil, errors.Trace(err)
}
@ -805,7 +780,6 @@ const maxColumnSize = 6 << 20
// RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) {
ctx := util.StatsCtx(context.Background())
var js *storage.JSONTable
var err error
if isPartition {
@ -837,20 +811,20 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.
return 0, err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
sctx := se.(sessionctx.Context)
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return version, errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
ts := time.Now().Format("2006-01-02 15:04:05.999999")
const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)"
for i := 0; i < len(blocks); i++ {
if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil {
if _, err := util.Exec(sctx, sql, physicalID, blocks[i], i, version, ts); err != nil {
return version, errors.Trace(err)
}
}
@ -878,20 +852,19 @@ func (h *Handle) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, p
return err
}
defer h.pool.Put(se)
exec := se.(sqlexec.RestrictedSQLExecutor)
ctx := util.StatsCtx(context.Background())
sctx := se.(sessionctx.Context)
jobInfo := job.JobInfo
const textMaxLength = 65535
if len(jobInfo) > textMaxLength {
jobInfo = jobInfo[:textMaxLength]
}
const insertJob = "INSERT INTO mysql.analyze_jobs (table_schema, table_name, partition_name, job_info, state, instance, process_id) VALUES (%?, %?, %?, %?, %?, %?, %?)"
_, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, insertJob, job.DBName, job.TableName, job.PartitionName, jobInfo, statistics.AnalyzePending, instance, procID)
_, _, err = util.ExecRows(sctx, insertJob, job.DBName, job.TableName, job.PartitionName, jobInfo, statistics.AnalyzePending, instance, procID)
if err != nil {
return err
}
const getJobID = "SELECT LAST_INSERT_ID()"
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, getJobID)
rows, _, err := util.ExecRows(sctx, getJobID)
if err != nil {
return err
}
@ -913,8 +886,7 @@ func (h *Handle) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, p
// DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime.
func (h *Handle) DeleteAnalyzeJobs(updateTime time.Time) error {
ctx := util.StatsCtx(context.Background())
_, _, err := h.execRestrictedSQL(ctx, "DELETE FROM mysql.analyze_jobs WHERE update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)", updateTime.UTC().Format(types.TimeFormat))
_, _, err := h.execRows("DELETE FROM mysql.analyze_jobs WHERE update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)", updateTime.UTC().Format(types.TimeFormat))
return err
}

View File

@ -9,7 +9,6 @@ go_library(
"//sessionctx",
"//statistics/handle/cache",
"//statistics/handle/util",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
],
)

View File

@ -15,13 +15,10 @@
package history
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/sqlexec"
)
// RecordHistoricalStatsMeta records the historical stats meta.
@ -32,10 +29,7 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u
if !sctx.GetSessionVars().EnableHistoricalStats {
return nil
}
ctx := util.StatsCtx(context.Background())
exec := sctx.(sqlexec.SQLExecutor)
rexec := sctx.(sqlexec.RestrictedSQLExecutor)
rows, _, err := rexec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version)
rows, _, err := util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version)
if err != nil {
return errors.Trace(err)
}
@ -44,16 +38,16 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())"
if _, err := exec.ExecuteInternal(ctx, sql, tableID, modifyCount, count, version, source); err != nil {
if _, err := util.Exec(sctx, sql, tableID, modifyCount, count, version, source); err != nil {
return errors.Trace(err)
}
cache.TableRowStatsCache.Invalidate(tableID)

View File

@ -596,14 +596,13 @@ func loadNeededIndexHistograms(reader *StatsReader, statsCache *cache.StatsCache
// StatsMetaByTableIDFromStorage gets the stats meta of a table from storage.
func StatsMetaByTableIDFromStorage(sctx sessionctx.Context, tableID int64, snapshot uint64) (version uint64, modifyCount, count int64, err error) {
ctx := util.StatsCtx(context.Background())
var rows []chunk.Row
exec := sctx.(sqlexec.RestrictedSQLExecutor)
if snapshot == 0 {
rows, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
rows, _, err = util.ExecRows(sctx,
"SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID)
} else {
rows, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession, sqlexec.ExecOptionWithSnapshot(snapshot)},
rows, _, err = util.ExecWithOpts(sctx,
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionWithSnapshot(snapshot), sqlexec.ExecOptionUseCurSession},
"SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID)
}
if err != nil || len(rows) == 0 {

View File

@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/util"
@ -41,7 +40,7 @@ const batchInsertSize = 10
// maxInsertLength is the length limit for internal insert SQL.
const maxInsertLength = 1024 * 1024
func saveTopNToStorage(ctx context.Context, exec sqlexec.SQLExecutor, tableID int64, isIndex int, histID int64, topN *statistics.TopN) error {
func saveTopNToStorage(sctx sessionctx.Context, tableID int64, isIndex int, histID int64, topN *statistics.TopN) error {
if topN == nil {
return nil
}
@ -65,17 +64,18 @@ func saveTopNToStorage(ctx context.Context, exec sqlexec.SQLExecutor, tableID in
sql.WriteString(val)
}
i = end
if _, err := exec.ExecuteInternal(ctx, sql.String()); err != nil {
if _, err := util.Exec(sctx, sql.String()); err != nil {
return err
}
}
return nil
}
func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stmtctx.StatementContext, tableID int64, isIndex int, hg *statistics.Histogram) (lastAnalyzePos []byte, err error) {
func saveBucketsToStorage(sctx sessionctx.Context, tableID int64, isIndex int, hg *statistics.Histogram) (lastAnalyzePos []byte, err error) {
if hg == nil {
return
}
sc := sctx.GetSessionVars().StmtCtx
for i := 0; i < len(hg.Buckets); {
end := i + batchInsertSize
if end > len(hg.Buckets) {
@ -113,7 +113,7 @@ func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stm
sql.WriteString(val)
}
i = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
if _, err = util.Exec(sctx, sql.String()); err != nil {
return
}
}
@ -139,13 +139,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
}
}()
ctx := util.StatsCtx(context.Background())
exec := sctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return err
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
txn, err := sctx.Txn(true)
if err != nil {
@ -155,7 +154,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
// 1. Save mysql.stats_meta.
var rs sqlexec.RecordSet
// Lock this row to prevent writing of concurrent analyze.
rs, err = exec.ExecuteInternal(ctx, "select snapshot, count, modify_count from mysql.stats_meta where table_id = %? for update", tableID)
rs, err = util.Exec(sctx, "select snapshot, count, modify_count from mysql.stats_meta where table_id = %? for update", tableID)
if err != nil {
return err
}
@ -192,7 +191,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
snapShot = 0
count = 0
}
if _, err = exec.ExecuteInternal(ctx,
if _, err = util.Exec(sctx,
"replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)",
version,
tableID,
@ -205,7 +204,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
} else if results.ForMVIndex {
// 1-2. There's already an existing record for this table, and we are handling stats for mv index now.
// In this case, we only update the version. See comments for AnalyzeResults.ForMVIndex for more details.
if _, err = exec.ExecuteInternal(ctx,
if _, err = util.Exec(sctx,
"update mysql.stats_meta set version=%? where table_id=%?",
version,
tableID,
@ -245,7 +244,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
zap.Int64("results.Count", results.Count),
zap.Int64("count", cnt))
}
if _, err = exec.ExecuteInternal(ctx,
if _, err = util.Exec(sctx,
"update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?",
version,
modifyCnt,
@ -278,40 +277,39 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
return err
}
// Delete outdated data
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
if err = saveTopNToStorage(ctx, exec, tableID, result.IsIndex, hg.ID, result.TopNs[i]); err != nil {
if err = saveTopNToStorage(sctx, tableID, result.IsIndex, hg.ID, result.TopNs[i]); err != nil {
return err
}
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
if _, err := util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
if fmSketch != nil && needDumpFMS {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, fmSketch); err != nil {
if _, err = util.Exec(sctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, fmSketch); err != nil {
return err
}
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)",
if _, err = util.Exec(sctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)",
tableID, result.IsIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, results.StatsVer, statistics.AnalyzeFlag, hg.Correlation); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
sc := sctx.GetSessionVars().StmtCtx
var lastAnalyzePos []byte
lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, result.IsIndex, hg)
lastAnalyzePos, err = saveBucketsToStorage(sctx, tableID, result.IsIndex, hg)
if err != nil {
return err
}
if len(lastAnalyzePos) > 0 {
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil {
if _, err = util.Exec(sctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil {
return err
}
}
if result.IsIndex == 0 {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = values(last_analyzed_at)", tableID, hg.ID); err != nil {
if _, err = util.Exec(sctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = values(last_analyzed_at)", tableID, hg.ID); err != nil {
return err
}
}
@ -336,7 +334,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
case ast.StatsTypeDependency:
statsStr = item.StringVals
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil {
if _, err = util.Exec(sctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil {
return err
}
}
@ -358,14 +356,12 @@ func SaveStatsToStorage(sctx sessionctx.Context,
}
}()
exec := sctx.(sqlexec.SQLExecutor)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
version, err := util.GetStartTS(sctx)
if err != nil {
@ -374,10 +370,10 @@ func SaveStatsToStorage(sctx sessionctx.Context,
// If the count is less than 0, then we do not want to update the modify count and count.
if count >= 0 {
_, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount)
_, err = util.Exec(sctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount)
cache.TableRowStatsCache.Invalidate(tableID)
} else {
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID)
_, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID)
}
if err != nil {
return err
@ -388,39 +384,38 @@ func SaveStatsToStorage(sctx sessionctx.Context,
return err
}
// Delete outdated data
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
if err = saveTopNToStorage(ctx, exec, tableID, isIndex, hg.ID, topN); err != nil {
if err = saveTopNToStorage(sctx, tableID, isIndex, hg.ID, topN); err != nil {
return err
}
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
if _, err := util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
flag := 0
if isAnalyzed == 1 {
flag = statistics.AnalyzeFlag
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)",
if _, err = util.Exec(sctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)",
tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, statsVersion, flag, hg.Correlation); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
sc := sctx.GetSessionVars().StmtCtx
var lastAnalyzePos []byte
lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, isIndex, hg)
lastAnalyzePos, err = saveBucketsToStorage(sctx, tableID, isIndex, hg)
if err != nil {
return err
}
if isAnalyzed == 1 && len(lastAnalyzePos) > 0 {
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil {
if _, err = util.Exec(sctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil {
return err
}
}
if updateAnalyzeTime && isIndex == 0 {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = current_timestamp()", tableID, hg.ID); err != nil {
if _, err = util.Exec(sctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = current_timestamp()", tableID, hg.ID); err != nil {
return err
}
}
@ -439,20 +434,18 @@ func SaveMetaToStorage(
}
}()
exec := sctx.(sqlexec.SQLExecutor)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
_, err = util.Exec(sctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(ctx, exec, err)
err = util.FinishTransaction(sctx, err)
}()
version, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
_, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount)
_, err = util.Exec(sctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount)
statsVer = version
cache.TableRowStatsCache.Invalidate(tableID)
return err

View File

@ -16,7 +16,6 @@ package handle
import (
"cmp"
"context"
"fmt"
"slices"
"strings"
@ -284,15 +283,13 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI
return false, err
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
sctx := se.(sessionctx.Context)
ctx := utilstats.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
_, err = utilstats.Exec(sctx, "begin")
if err != nil {
return false, errors.Trace(err)
}
defer func() {
err = utilstats.FinishTransaction(ctx, exec, err)
err = utilstats.FinishTransaction(sctx, err)
}()
statsVersion, err = getSessionTxnStartTS(se)
@ -327,7 +324,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI
isPartitionLocked = true
}
tableOrPartitionLocked := isTableLocked || isPartitionLocked
if err = updateStatsMeta(ctx, exec, statsVersion, delta,
if err = updateStatsMeta(sctx, statsVersion, delta,
physicalTableID, tableOrPartitionLocked); err != nil {
return
}
@ -336,7 +333,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI
// We will update its global-stats when the partition is unlocked.
if isTableLocked || !isPartitionLocked {
// If it's a partitioned table and its global-stats exists, update its count and modify_count as well.
if err = updateStatsMeta(ctx, exec, statsVersion, delta, tableID, isTableLocked); err != nil {
if err = updateStatsMeta(sctx, statsVersion, delta, tableID, isTableLocked); err != nil {
return
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
@ -348,7 +345,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI
if _, ok := lockedTables[physicalTableID]; ok {
isTableLocked = true
}
if err = updateStatsMeta(ctx, exec, statsVersion, delta,
if err = updateStatsMeta(sctx, statsVersion, delta,
physicalTableID, isTableLocked); err != nil {
return
}
@ -360,8 +357,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI
}
func updateStatsMeta(
ctx context.Context,
exec sqlexec.SQLExecutor,
sctx sessionctx.Context,
startTS uint64,
delta variable.TableDelta,
id int64,
@ -369,21 +365,21 @@ func updateStatsMeta(
) (err error) {
if isLocked {
if delta.Delta < 0 {
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_table_locked set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?",
_, err = utilstats.Exec(sctx, "update mysql.stats_table_locked set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?",
startTS, -delta.Delta, delta.Count, id, -delta.Delta)
} else {
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_table_locked set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?",
_, err = utilstats.Exec(sctx, "update mysql.stats_table_locked set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?",
startTS, delta.Delta, delta.Count, id)
}
} else {
if delta.Delta < 0 {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+
_, err = utilstats.Exec(sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)",
startTS, id, delta.Count, -delta.Delta, -delta.Delta)
} else {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
_, err = utilstats.Exec(sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS,
id, delta.Count, delta.Delta)
}
@ -406,10 +402,9 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e
if len(values) == 0 {
return nil
}
ctx := utilstats.StatsCtx(context.Background())
sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+
"values %s on duplicate key update tot_col_size = tot_col_size + values(tot_col_size)", strings.Join(values, ","))
_, _, err := h.execRestrictedSQL(ctx, sql)
_, _, err := h.execRows(sql)
return errors.Trace(err)
}
@ -454,7 +449,7 @@ func (h *Handle) DumpColStatsUsageToKV() error {
}
}
sqlexec.MustFormatSQL(sql, " ON DUPLICATE KEY UPDATE last_used_at = CASE WHEN last_used_at IS NULL THEN VALUES(last_used_at) ELSE GREATEST(last_used_at, VALUES(last_used_at)) END")
if _, _, err := h.execRestrictedSQL(context.Background(), sql.String()); err != nil {
if _, _, err := h.execRows(sql.String()); err != nil {
return errors.Trace(err)
}
for j := i; j < end; j++ {

View File

@ -15,7 +15,6 @@
package usage
import (
"context"
"strings"
"sync"
"time"
@ -146,15 +145,8 @@ func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsageMap {
// batchInsertSize is the batch size used by internal SQL to insert values to some system table.
const batchInsertSize = 10
var (
// useCurrentSession to make sure the sql is executed in current session.
useCurrentSession = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}
)
// DumpIndexUsageToKV will dump in-memory index usage information to KV.
func DumpIndexUsageToKV(sctx sessionctx.Context, listHead *SessionIndexUsageCollector) error {
ctx := util.StatsCtx(context.Background())
exec := sctx.(sqlexec.RestrictedSQLExecutor)
mapper := sweepIdxUsageList(listHead)
type FullIndexUsageInformation struct {
information IndexUsageInformation
@ -180,7 +172,7 @@ func DumpIndexUsageToKV(sctx sessionctx.Context, listHead *SessionIndexUsageColl
}
}
sqlexec.MustFormatSQL(sql, "on duplicate key update query_count=query_count+values(query_count),rows_selected=rows_selected+values(rows_selected),last_used_at=greatest(last_used_at, values(last_used_at))")
if _, _, err := exec.ExecRestrictedSQL(ctx, useCurrentSession, sql.String()); err != nil {
if _, _, err := util.ExecRows(sctx, sql.String()); err != nil {
return errors.Trace(err)
}
}
@ -193,8 +185,6 @@ func GCIndexUsageOnKV(sctx sessionctx.Context) error {
// We periodically delete the usage information of non-existent indexes through information_schema.tidb_indexes.
// This sql will delete the usage information of those indexes that not in information_schema.tidb_indexes.
sql := `delete from mysql.SCHEMA_INDEX_USAGE as stats where stats.index_id not in (select idx.index_id from information_schema.tidb_indexes as idx)`
ctx := util.StatsCtx(context.Background())
exec := sctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, useCurrentSession, sql)
_, _, err := util.ExecRows(sctx, sql)
return err
}

View File

@ -32,15 +32,11 @@ func StatsCtx(ctx context.Context) context.Context {
}
// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func FinishTransaction(ctx context.Context, exec interface{}, err error) error {
sqlExec, ok := exec.(sqlexec.SQLExecutor)
if !ok {
return errors.Errorf("invalid sql executor")
}
func FinishTransaction(sctx sessionctx.Context, err error) error {
if err == nil {
_, err = sqlExec.ExecuteInternal(ctx, "commit")
_, err = Exec(sctx, "commit")
} else {
_, err1 := sqlExec.ExecuteInternal(ctx, "rollback")
_, err1 := Exec(sctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
@ -55,11 +51,30 @@ func GetStartTS(sctx sessionctx.Context) (uint64, error) {
return txn.StartTS(), nil
}
// Read is a helper function to execute sql and return rows and fields.
func Read(exec interface{}, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) {
sqlExec, ok := exec.(sqlexec.RestrictedSQLExecutor)
// Exec is a helper function to execute sql and return RecordSet.
func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) {
sqlExec, ok := sctx.(sqlexec.SQLExecutor)
if !ok {
return nil, errors.Errorf("invalid sql executor")
}
// TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor
return sqlExec.ExecuteInternal(StatsCtx(context.Background()), sql, args...)
}
// ExecRows is a helper function to execute sql and return rows and fields.
func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) {
sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
return nil, nil, errors.Errorf("invalid sql executor")
}
return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...)
}
// ExecWithOpts is a helper function to execute sql and return rows and fields.
func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) {
sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
return nil, nil, errors.Errorf("invalid sql executor")
}
return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), opts, sql, args...)
}