From da5cfbc5f80d75267d041ddca8e22c2ff930c67f Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Wed, 27 Sep 2023 17:31:46 +0800 Subject: [PATCH] planner: unify ways to execute SQLs in stats_handle package (#47324) ref pingcap/tidb#46905 --- executor/infoschema_reader.go | 12 +-- statistics/handle/autoanalyze/BUILD.bazel | 1 - statistics/handle/autoanalyze/autoanalyze.go | 35 +++----- statistics/handle/bootstrap.go | 24 +++-- statistics/handle/cache/BUILD.bazel | 1 - statistics/handle/cache/statscacheinner.go | 27 +++--- statistics/handle/ddl.go | 61 ++++++------- statistics/handle/extstats/BUILD.bazel | 1 - statistics/handle/extstats/extended_stats.go | 46 ++++------ statistics/handle/gc.go | 95 ++++++++++---------- statistics/handle/handle.go | 64 ++++--------- statistics/handle/history/BUILD.bazel | 1 - statistics/handle/history/history_stats.go | 14 +-- statistics/handle/storage/read.go | 7 +- statistics/handle/storage/save.go | 79 ++++++++-------- statistics/handle/update.go | 29 +++--- statistics/handle/usage/index_usage.go | 14 +-- statistics/handle/util/util.go | 35 +++++--- 18 files changed, 230 insertions(+), 316 deletions(-) diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 115ce18ab3..382ef32f92 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -117,13 +117,13 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex case infoschema.TableStatistics: e.setDataForStatistics(sctx, dbs) case infoschema.TableTables: - err = e.setDataFromTables(ctx, sctx, dbs) + err = e.setDataFromTables(sctx, dbs) case infoschema.TableReferConst: err = e.setDataFromReferConst(sctx, dbs) case infoschema.TableSequences: e.setDataFromSequences(sctx, dbs) case infoschema.TablePartitions: - err = e.setDataFromPartitions(ctx, sctx, dbs) + err = e.setDataFromPartitions(sctx, dbs) case infoschema.TableClusterInfo: err = e.dataForTiDBClusterInfo(sctx) case infoschema.TableAnalyzeStatus: @@ -487,8 +487,8 @@ func (e *memtableRetriever) setDataFromReferConst(sctx sessionctx.Context, schem return nil } -func (e *memtableRetriever) setDataFromTables(ctx context.Context, sctx sessionctx.Context, schemas []*model.DBInfo) error { - err := cache.TableRowStatsCache.Update(ctx, sctx) +func (e *memtableRetriever) setDataFromTables(sctx sessionctx.Context, schemas []*model.DBInfo) error { + err := cache.TableRowStatsCache.Update(sctx) if err != nil { return err } @@ -899,9 +899,9 @@ func calcCharOctLength(lenInChar int, cs string) int { return lenInBytes } -func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sessionctx.Context, schemas []*model.DBInfo) error { +func (e *memtableRetriever) setDataFromPartitions(sctx sessionctx.Context, schemas []*model.DBInfo) error { cache := cache.TableRowStatsCache - err := cache.Update(ctx, sctx) + err := cache.Update(sctx) if err != nil { return err } diff --git a/statistics/handle/autoanalyze/BUILD.bazel b/statistics/handle/autoanalyze/BUILD.bazel index 53cd7c757a..ce29bac002 100644 --- a/statistics/handle/autoanalyze/BUILD.bazel +++ b/statistics/handle/autoanalyze/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//infoschema", - "//kv", "//metrics", "//parser/ast", "//parser/model", diff --git a/statistics/handle/autoanalyze/autoanalyze.go b/statistics/handle/autoanalyze/autoanalyze.go index 90b3c5408d..b8f1c93d3c 100644 --- a/statistics/handle/autoanalyze/autoanalyze.go +++ b/statistics/handle/autoanalyze/autoanalyze.go @@ -15,7 +15,6 @@ package autoanalyze import ( - "context" "fmt" "math" "math/rand" @@ -25,7 +24,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -64,10 +62,9 @@ func parseAnalyzePeriod(start, end string) (time.Time, time.Time, error) { return s, e, err } -func getAutoAnalyzeParameters(exec sqlexec.RestrictedSQLExecutor) map[string]string { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) +func getAutoAnalyzeParameters(sctx sessionctx.Context) map[string]string { sql := "select variable_name, variable_value from mysql.global_variables where variable_name in (%?, %?, %?)" - rows, _, err := exec.ExecRestrictedSQL(ctx, nil, sql, variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) + rows, _, err := statsutil.ExecWithOpts(sctx, nil, sql, variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) if err != nil { return map[string]string{} } @@ -103,9 +100,8 @@ func HandleAutoAnalyze(sctx sessionctx.Context, logutil.BgLogger().Error("HandleAutoAnalyze panicked", zap.Any("error", r), zap.Stack("stack")) } }() - exec := sctx.(sqlexec.RestrictedSQLExecutor) dbs := is.AllSchemaNames() - parameters := getAutoAnalyzeParameters(exec) + parameters := getAutoAnalyzeParameters(sctx) autoAnalyzeRatio := parseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) start, end, err := parseAnalyzePeriod(parameters[variable.TiDBAutoAnalyzeStartTime], parameters[variable.TiDBAutoAnalyzeEndTime]) if err != nil { @@ -168,7 +164,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context, if pi == nil { statsTbl := opt.GetTableStats(tblInfo) sql := "analyze table %n.%n" - analyzed := autoAnalyzeTable(sctx, exec, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O) + analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O) if analyzed { // analyze one table at a time to let it get the freshest parameters. // others will be analyzed next round which is just 3s later. @@ -184,7 +180,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context, } } if pruneMode == variable.Dynamic { - analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, exec, opt, tblInfo, partitionDefs, db, autoAnalyzeRatio) + analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, opt, tblInfo, partitionDefs, db, autoAnalyzeRatio) if analyzed { return true } @@ -193,7 +189,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context, for _, def := range partitionDefs { sql := "analyze table %n.%n partition %n" statsTbl := opt.GetPartitionStats(tblInfo, def.ID) - analyzed := autoAnalyzeTable(sctx, exec, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) + analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) if analyzed { return true } @@ -207,7 +203,6 @@ func HandleAutoAnalyze(sctx sessionctx.Context, var AutoAnalyzeMinCnt int64 = 1000 func autoAnalyzeTable(sctx sessionctx.Context, - exec sqlexec.RestrictedSQLExecutor, opt *Opt, tblInfo *model.TableInfo, statsTbl *statistics.Table, ratio float64, sql string, params ...interface{}) bool { @@ -222,7 +217,7 @@ func autoAnalyzeTable(sctx sessionctx.Context, logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"), zap.String("sql", escaped), zap.String("reason", reason)) tableStatsVer := sctx.GetSessionVars().AnalyzeVersion statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) - execAutoAnalyze(sctx, exec, opt, tableStatsVer, sql, params...) + execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...) return true } for _, idx := range tblInfo.Indices { @@ -236,7 +231,7 @@ func autoAnalyzeTable(sctx sessionctx.Context, logutil.BgLogger().Info("auto analyze for unanalyzed", zap.String("category", "stats"), zap.String("sql", escaped)) tableStatsVer := sctx.GetSessionVars().AnalyzeVersion statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) - execAutoAnalyze(sctx, exec, opt, tableStatsVer, sqlWithIdx, paramsWithIdx...) + execAutoAnalyze(sctx, opt, tableStatsVer, sqlWithIdx, paramsWithIdx...) return true } } @@ -288,7 +283,6 @@ func TableAnalyzed(tbl *statistics.Table) bool { } func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, - exec sqlexec.RestrictedSQLExecutor, opt *Opt, tblInfo *model.TableInfo, partitionDefs []model.PartitionDefinition, db string, ratio float64) bool { @@ -335,7 +329,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"), zap.String("table", tblInfo.Name.String()), zap.Any("partitions", partitionNames[start:end])) - execAutoAnalyze(sctx, exec, opt, tableStatsVer, sql, params...) + execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...) } return true } @@ -366,7 +360,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, zap.String("table", tblInfo.Name.String()), zap.String("index", idx.Name.String()), zap.Any("partitions", partitionNames[start:end])) - execAutoAnalyze(sctx, exec, opt, tableStatsVer, sql, params...) + execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...) } return true } @@ -381,12 +375,11 @@ var execOptionForAnalyze = map[int]sqlexec.OptionFuncAlias{ } func execAutoAnalyze(sctx sessionctx.Context, - exec sqlexec.RestrictedSQLExecutor, opt *Opt, statsVer int, sql string, params ...interface{}) { startTime := time.Now() - _, _, err := execRestrictedSQLWithStatsVer(sctx, exec, opt, statsVer, sql, params...) + _, _, err := execAnalyzeStmt(sctx, opt, statsVer, sql, params...) dur := time.Since(startTime) metrics.AutoAnalyzeHistogram.Observe(dur.Seconds()) if err != nil { @@ -401,12 +394,10 @@ func execAutoAnalyze(sctx sessionctx.Context, } } -func execRestrictedSQLWithStatsVer(sctx sessionctx.Context, - exec sqlexec.RestrictedSQLExecutor, +func execAnalyzeStmt(sctx sessionctx.Context, opt *Opt, statsVer int, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { - ctx := statsutil.StatsCtx(context.Background()) pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load() analyzeSnapshot := sctx.GetSessionVars().EnableAnalyzeSnapshot optFuncs := []sqlexec.OptionFuncAlias{ @@ -416,5 +407,5 @@ func execRestrictedSQLWithStatsVer(sctx sessionctx.Context, sqlexec.ExecOptionUseCurSession, sqlexec.ExecOptionWithSysProcTrack(opt.AutoAnalyzeProcIDGetter(), opt.SysProcTracker.Track, opt.SysProcTracker.UnTrack), } - return exec.ExecRestrictedSQL(ctx, optFuncs, sql, params...) + return statsutil.ExecWithOpts(sctx, optFuncs, sql, params...) } diff --git a/statistics/handle/bootstrap.go b/statistics/handle/bootstrap.go index 952a38f7b4..eef4eef089 100644 --- a/statistics/handle/bootstrap.go +++ b/statistics/handle/bootstrap.go @@ -29,10 +29,10 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle/cache" + "github.com/pingcap/tidb/statistics/handle/util" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -66,7 +66,7 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *cache.Stat func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (*cache.StatsCache, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" - rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) + rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return nil, errors.Trace(err) } @@ -234,7 +234,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *cach func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache *cache.StatsCache) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" - rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) + rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } @@ -257,7 +257,7 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache *cache. func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *cache.StatsCache) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" - rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) + rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } @@ -306,7 +306,7 @@ func (*Handle) initStatsTopN4Chunk(cache *cache.StatsCache, iter *chunk.Iterator func (h *Handle) initStatsTopN(cache *cache.StatsCache) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1" - rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) + rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } @@ -356,7 +356,7 @@ func (*Handle) initStatsFMSketch4Chunk(cache *cache.StatsCache, iter *chunk.Iter func (h *Handle) initStatsFMSketch(cache *cache.StatsCache) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, is_index, hist_id, value from mysql.stats_fm_sketch" - rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) + rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } @@ -429,7 +429,7 @@ func (*Handle) initStatsBuckets4Chunk(cache *cache.StatsCache, iter *chunk.Itera func (h *Handle) initStatsBuckets(cache *cache.StatsCache) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" - rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) + rc, err := util.Exec(h.initStatsCtx, sql) if err != nil { return errors.Trace(err) } @@ -468,14 +468,13 @@ func (h *Handle) initStatsBuckets(cache *cache.StatsCache) error { // InitStatsLite initiates the stats cache. The function is liter and faster than InitStats. // Column/index stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN. func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) defer func() { - _, err1 := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "commit") + _, err1 := util.Exec(h.initStatsCtx, "commit") if err == nil && err1 != nil { err = err1 } }() - _, err = h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "begin") + _, err = util.Exec(h.initStatsCtx, "begin") if err != nil { return err } @@ -496,14 +495,13 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) { // Column stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN. func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) defer func() { - _, err1 := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "commit") + _, err1 := util.Exec(h.initStatsCtx, "commit") if err == nil && err1 != nil { err = err1 } }() - _, err = h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "begin") + _, err = util.Exec(h.initStatsCtx, "begin") if err != nil { return err } diff --git a/statistics/handle/cache/BUILD.bazel b/statistics/handle/cache/BUILD.bazel index 9a057ea7e0..61c3aa59df 100644 --- a/statistics/handle/cache/BUILD.bazel +++ b/statistics/handle/cache/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "//types", "//util/chunk", "//util/logutil", - "//util/sqlexec", "//util/syncutil", "@org_uber_go_zap//:zap", ], diff --git a/statistics/handle/cache/statscacheinner.go b/statistics/handle/cache/statscacheinner.go index b8149421ba..4741e14ae4 100644 --- a/statistics/handle/cache/statscacheinner.go +++ b/statistics/handle/cache/statscacheinner.go @@ -15,7 +15,6 @@ package cache import ( - "context" "strconv" "strings" "sync/atomic" @@ -34,7 +33,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/syncutil" "go.uber.org/zap" ) @@ -223,20 +221,19 @@ func (c *StatsTableRowCache) GetColLength(id tableHistID) uint64 { } // Update tries to update the cache. -func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context) error { +func (c *StatsTableRowCache) Update(sctx sessionctx.Context) error { c.mu.Lock() defer c.mu.Unlock() - ctx = util.StatsCtx(ctx) if time.Since(c.modifyTime) < tableStatsCacheExpiry { if len(c.dirtyIDs) > 0 { - tableRows, err := getRowCountTables(ctx, sctx, c.dirtyIDs...) + tableRows, err := getRowCountTables(sctx, c.dirtyIDs...) if err != nil { return err } for id, tr := range tableRows { c.tableRows[id] = tr } - colLength, err := getColLengthTables(ctx, sctx, c.dirtyIDs...) + colLength, err := getColLengthTables(sctx, c.dirtyIDs...) if err != nil { return err } @@ -247,11 +244,11 @@ func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context } return nil } - tableRows, err := getRowCountTables(ctx, sctx) + tableRows, err := getRowCountTables(sctx) if err != nil { return err } - colLength, err := getColLengthTables(ctx, sctx) + colLength, err := getColLengthTables(sctx) if err != nil { return err } @@ -262,16 +259,15 @@ func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context return nil } -func getRowCountTables(ctx context.Context, sctx sessionctx.Context, tableIDs ...int64) (map[int64]uint64, error) { - exec := sctx.(sqlexec.RestrictedSQLExecutor) +func getRowCountTables(sctx sessionctx.Context, tableIDs ...int64) (map[int64]uint64, error) { var rows []chunk.Row var err error if len(tableIDs) == 0 { - rows, _, err = exec.ExecRestrictedSQL(ctx, nil, "select table_id, count from mysql.stats_meta") + rows, _, err = util.ExecWithOpts(sctx, nil, "select table_id, count from mysql.stats_meta") } else { inTblIDs := buildInTableIDsString(tableIDs) sql := "select table_id, count from mysql.stats_meta where " + inTblIDs - rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql) + rows, _, err = util.ExecWithOpts(sctx, nil, sql) } if err != nil { return nil, err @@ -304,17 +300,16 @@ type tableHistID struct { histID int64 } -func getColLengthTables(ctx context.Context, sctx sessionctx.Context, tableIDs ...int64) (map[tableHistID]uint64, error) { - exec := sctx.(sqlexec.RestrictedSQLExecutor) +func getColLengthTables(sctx sessionctx.Context, tableIDs ...int64) (map[tableHistID]uint64, error) { var rows []chunk.Row var err error if len(tableIDs) == 0 { sql := "select table_id, hist_id, tot_col_size from mysql.stats_histograms where is_index = 0" - rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql) + rows, _, err = util.ExecWithOpts(sctx, nil, sql) } else { inTblIDs := buildInTableIDsString(tableIDs) sql := "select table_id, hist_id, tot_col_size from mysql.stats_histograms where is_index = 0 and " + inTblIDs - rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql) + rows, _, err = util.ExecWithOpts(sctx, nil, sql) } if err != nil { return nil, err diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 5a4ef1bc6a..65b096f0db 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -132,27 +132,26 @@ func (h *Handle) updateStatsVersion() error { return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) + sctx := se.(sessionctx.Context) - ctx := statsutil.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin") + _, err = statsutil.Exec(sctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = statsutil.FinishTransaction(ctx, exec, err) + err = statsutil.FinishTransaction(sctx, err) }() startTS, err := getSessionTxnStartTS(se) if err != nil { return errors.Trace(err) } - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?", startTS); err != nil { + if _, err = statsutil.Exec(sctx, "update mysql.stats_meta set version = %?", startTS); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?", startTS); err != nil { + if _, err = statsutil.Exec(sctx, "update mysql.stats_extended set version = %?", startTS); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set version = %?", startTS); err != nil { + if _, err = statsutil.Exec(sctx, "update mysql.stats_histograms set version = %?", startTS); err != nil { return err } @@ -270,17 +269,16 @@ func (h *Handle) changeGlobalStatsID(from, to int64) (err error) { return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) - ctx := statsutil.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + sctx := se.(sessionctx.Context) + _, err = statsutil.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err = statsutil.FinishTransaction(ctx, exec, err) + err = statsutil.FinishTransaction(sctx, err) }() for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} { - _, err = exec.ExecuteInternal(ctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from) + _, err = statsutil.Exec(sctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from) if err != nil { return err } @@ -327,31 +325,30 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) - ctx := statsutil.StatsCtx(context.Background()) + sctx := se.(sessionctx.Context) - _, err = exec.ExecuteInternal(ctx, "begin") + _, err = statsutil.Exec(sctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = statsutil.FinishTransaction(ctx, exec, err) + err = statsutil.FinishTransaction(sctx, err) }() startTS, err := getSessionTxnStartTS(se) if err != nil { return errors.Trace(err) } - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { + if _, err := statsutil.Exec(sctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { return err } statsVer = startTS for _, col := range info.Columns { - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { + if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { return err } } for _, idx := range info.Indices { - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil { + if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil { return err } } @@ -372,21 +369,20 @@ func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) { return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) - ctx := statsutil.StatsCtx(context.Background()) + sctx := se.(sessionctx.Context) - _, err = exec.ExecuteInternal(ctx, "begin") + _, err = statsutil.Exec(sctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = statsutil.FinishTransaction(ctx, exec, err) + err = statsutil.FinishTransaction(sctx, err) }() startTS, err := getSessionTxnStartTS(se) if err != nil { return errors.Trace(err) } - if _, err := exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil { + if _, err := statsutil.Exec(sctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil { return err } return nil @@ -407,25 +403,24 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) + sctx := se.(sessionctx.Context) ctx := statsutil.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin") + _, err = statsutil.Exec(sctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = statsutil.FinishTransaction(ctx, exec, err) + err = statsutil.FinishTransaction(sctx, err) }() startTS, err := getSessionTxnStartTS(se) if err != nil { return errors.Trace(err) } - sctx := se.(sessionctx.Context) // First of all, we update the version. - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID) + _, err = statsutil.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID) if err != nil { return } @@ -434,7 +429,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf if sctx.GetSessionVars().StmtCtx.AffectedRows() > 0 { // By this step we can get the count of this table, then we can sure the count and repeats of bucket. var rs sqlexec.RecordSet - rs, err = exec.ExecuteInternal(ctx, "select count from mysql.stats_meta where table_id = %?", physicalID) + rs, err = statsutil.Exec(sctx, "select count from mysql.stats_meta where table_id = %?", physicalID) if err != nil { return } @@ -453,12 +448,12 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf } if value.IsNull() { // If the adding column has default value null, all the existing rows have null value on the newly added column. - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { + if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { return err } } else { // If this stats exists, we insert histogram meta first, the distinct_count will always be one. - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { + if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { return err } value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) @@ -466,7 +461,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf return } // There must be only one bucket for this new column and the value is the default value. - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { + if _, err := statsutil.Exec(sctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { return err } } diff --git a/statistics/handle/extstats/BUILD.bazel b/statistics/handle/extstats/BUILD.bazel index 6e320fd7c2..aa95fde972 100644 --- a/statistics/handle/extstats/BUILD.bazel +++ b/statistics/handle/extstats/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//statistics/handle/util", "//util/logutil", "//util/mathutil", - "//util/sqlexec", "@com_github_pingcap_errors//:errors", "@org_uber_go_zap//:zap", ], diff --git a/statistics/handle/extstats/extended_stats.go b/statistics/handle/extstats/extended_stats.go index 8ddc94c564..9c31efa71f 100644 --- a/statistics/handle/extstats/extended_stats.go +++ b/statistics/handle/extstats/extended_stats.go @@ -15,7 +15,6 @@ package extstats import ( - "context" "encoding/json" "fmt" "slices" @@ -29,7 +28,6 @@ import ( "github.com/pingcap/tidb/statistics/handle/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" - "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -50,7 +48,6 @@ func InsertExtendedStats(sctx sessionctx.Context, recordHistoricalStatsMeta(tableID, statsVer, StatsMetaHistorySourceExtendedStats) } }() - exec := sctx.(sqlexec.RestrictedSQLExecutor) slices.Sort(colIDs) bytes, err := json.Marshal(colIDs) if err != nil { @@ -58,17 +55,15 @@ func InsertExtendedStats(sctx sessionctx.Context, } strColIDs := string(bytes) - ctx := util.StatsCtx(context.Background()) - sqlExecutor := exec.(sqlexec.SQLExecutor) - _, err = sqlExecutor.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, sqlExecutor, err) + err = util.FinishTransaction(sctx, err) }() // No need to use `exec.ExecuteInternal` since we have acquired the lock. - rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed) + rows, _, err := util.ExecRows(sctx, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed) if err != nil { return errors.Trace(err) } @@ -91,12 +86,12 @@ func InsertExtendedStats(sctx sessionctx.Context, return errors.Trace(err) } // Bump version in `mysql.stats_meta` to trigger stats cache refresh. - if _, err = sqlExecutor.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { + if _, err = util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { return err } statsVer = version // Remove the existing 'deleted' records. - if _, err = sqlExecutor.ExecuteInternal(ctx, "DELETE FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID); err != nil { + if _, err = util.Exec(sctx, "DELETE FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID); err != nil { return err } // Remove the cache item, which is necessary for cases like a cluster with 3 tidb instances, e.g, a, b and c. @@ -106,7 +101,7 @@ func InsertExtendedStats(sctx sessionctx.Context, // next `Update()` to remove the cached item then. removeExtendedStatsItem(currentCache, updateStatsCache, tableID, statsName) const sql = "INSERT INTO mysql.stats_extended(name, type, table_id, column_ids, version, status) VALUES (%?, %?, %?, %?, %?, %?)" - if _, err = sqlExecutor.ExecuteInternal(ctx, sql, statsName, tp, tableID, strColIDs, version, statistics.ExtendedStatsInited); err != nil { + if _, err = util.Exec(sctx, sql, statsName, tp, tableID, strColIDs, version, statistics.ExtendedStatsInited); err != nil { return err } return @@ -124,9 +119,7 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context, recordHistoricalStatsMeta(tableID, statsVer, StatsMetaHistorySourceExtendedStats) } }() - exec := sctx.(sqlexec.RestrictedSQLExecutor) - ctx := util.StatsCtx(context.Background()) - rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed) + rows, _, err := util.ExecRows(sctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed) if err != nil { return errors.Trace(err) } @@ -140,14 +133,12 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context, logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID)) } - sqlExec := exec.(sqlexec.SQLExecutor) - - _, err = sqlExec.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err1 := util.FinishTransaction(ctx, sqlExec, err) + err1 := util.FinishTransaction(sctx, err) if err == nil && err1 == nil { removeExtendedStatsItem(currentCache, updateStatsCache, tableID, statsName) } @@ -157,11 +148,11 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context, if err != nil { return errors.Trace(err) } - if _, err = sqlExec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { + if _, err = util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { return err } statsVer = version - if _, err = sqlExec.ExecuteInternal(ctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE name = %? and table_id = %?", version, statistics.ExtendedStatsDeleted, statsName, tableID); err != nil { + if _, err = util.Exec(sctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE name = %? and table_id = %?", version, statistics.ExtendedStatsDeleted, statsName, tableID); err != nil { return err } return nil @@ -170,10 +161,8 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context, // BuildExtendedStats build extended stats for column groups if needed based on the column samples. func BuildExtendedStats(sctx sessionctx.Context, tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) { - ctx := util.StatsCtx(context.Background()) - exec := sctx.(sqlexec.RestrictedSQLExecutor) const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)" - rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) + rows, _, err := util.ExecRows(sctx, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) if err != nil { return nil, errors.Trace(err) } @@ -293,15 +282,12 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context, return nil } - sqlExec := sctx.(sqlexec.SQLExecutor) - ctx := util.StatsCtx(context.Background()) - - _, err = sqlExec.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, sqlExec, err) + err = util.FinishTransaction(sctx, err) }() version, err := util.GetStartTS(sctx) if err != nil { @@ -321,12 +307,12 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context, statsStr = item.StringVals } // If isLoad is true, it's INSERT; otherwise, it's UPDATE. - if _, err := sqlExec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil { + if _, err := util.Exec(sctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil { return err } } if !isLoad { - if _, err := sqlExec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { + if _, err := util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { return err } statsVer = version diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index c2aef97994..d7107fc897 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle/cache" @@ -43,7 +44,6 @@ const gcLastTSVarName = "tidb_stats_gc_last_ts" // For dropped tables, we will first update their version // so that other tidb could know that table is deleted. func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) { - ctx := context.Background() // To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb, // we only garbage collect version before 10 lease. lease := mathutil.Max(h.Lease(), ddlLease) @@ -55,7 +55,7 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err // Get the last gc time. gcVer := now - offset - lastGC, err := h.GetLastGCTimestamp(ctx) + lastGC, err := h.GetLastGCTimestamp() if err != nil { return err } @@ -63,10 +63,10 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err if err != nil { return } - err = h.writeGCTimestampToKV(ctx, gcVer) + err = h.writeGCTimestampToKV(gcVer) }() - rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version >= %? and version < %?", lastGC, gcVer) + rows, _, err := h.execRows("select table_id from mysql.stats_meta where version >= %? and version < %?", lastGC, gcVer) if err != nil { return errors.Trace(err) } @@ -92,8 +92,8 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err } // GetLastGCTimestamp loads the last gc time from mysql.tidb. -func (h *Handle) GetLastGCTimestamp(ctx context.Context) (uint64, error) { - rows, _, err := h.execRestrictedSQL(ctx, "SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name=%?", gcLastTSVarName) +func (h *Handle) GetLastGCTimestamp() (uint64, error) { + rows, _, err := h.execRows("SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name=%?", gcLastTSVarName) if err != nil { return 0, errors.Trace(err) } @@ -108,8 +108,8 @@ func (h *Handle) GetLastGCTimestamp(ctx context.Context) (uint64, error) { return lastGcTS, nil } -func (h *Handle) writeGCTimestampToKV(ctx context.Context, newTS uint64) error { - _, _, err := h.execRestrictedSQL(ctx, +func (h *Handle) writeGCTimestampToKV(newTS uint64) error { + _, _, err := h.execRows( "insert into mysql.tidb (variable_name, variable_value) values (%?, %?) on duplicate key update variable_value = %?", gcLastTSVarName, newTS, @@ -119,15 +119,14 @@ func (h *Handle) writeGCTimestampToKV(ctx context.Context, newTS uint64) error { } func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error { - ctx := context.Background() - rows, _, err := h.execRestrictedSQL(ctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID) + rows, _, err := h.execRows("select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID) if err != nil { return errors.Trace(err) } // The table has already been deleted in stats and acknowledged to all tidb, // we can safely remove the meta info now. if len(rows) == 0 { - _, _, err = h.execRestrictedSQL(ctx, "delete from mysql.stats_meta where table_id = %?", physicalID) + _, _, err = h.execRows("delete from mysql.stats_meta where table_id = %?", physicalID) if err != nil { return errors.Trace(err) } @@ -164,7 +163,7 @@ func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error } } // Mark records in mysql.stats_extended as `deleted`. - rows, _, err = h.execRestrictedSQL(ctx, "select name, column_ids from mysql.stats_extended where table_id = %? and status in (%?, %?)", physicalID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) + rows, _, err = h.execRows("select name, column_ids from mysql.stats_extended where table_id = %? and status in (%?, %?)", physicalID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) if err != nil { return errors.Trace(err) } @@ -209,9 +208,9 @@ func (h *Handle) ClearOutdatedHistoryStats() error { return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) + sctx := se.(sessionctx.Context) sql := "select count(*) from mysql.stats_meta_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND" - rs, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + rs, err := util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) if err != nil { return err } @@ -226,12 +225,12 @@ func (h *Handle) ClearOutdatedHistoryStats() error { count := rows[0].GetInt64(0) if count > 0 { sql = "delete from mysql.stats_meta_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND" - _, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + _, err = util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) if err != nil { return err } sql = "delete from mysql.stats_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND" - _, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + _, err = util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) logutil.BgLogger().Info("clear outdated historical stats") return err } @@ -244,23 +243,22 @@ func (h *Handle) gcHistoryStatsFromKV(physicalID int64) error { return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) - ctx := util.StatsCtx(context.Background()) + sctx := se.(sessionctx.Context) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() sql := "delete from mysql.stats_history where table_id = %?" - _, err = exec.ExecuteInternal(ctx, sql, physicalID) + _, err = util.Exec(sctx, sql, physicalID) if err != nil { return errors.Trace(err) } sql = "delete from mysql.stats_meta_history where table_id = %?" - _, err = exec.ExecuteInternal(ctx, sql, physicalID) + _, err = util.Exec(sctx, sql, physicalID) return err } @@ -271,43 +269,42 @@ func (h *Handle) deleteHistStatsFromKV(physicalID int64, histID int64, isIndex i return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) - ctx := util.StatsCtx(context.Background()) + sctx := se.(sessionctx.Context) - _, err = exec.ExecuteInternal(ctx, "begin") + _, err = util.Exec(sctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() startTS, err := getSessionTxnStartTS(se) if err != nil { return errors.Trace(err) } // First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything. - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { + if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { return err } // delete histogram meta - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { return err } // delete top n data - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { return err } // delete all buckets - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { return err } // delete all fm sketch - if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + if _, err := util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { return err } if isIndex == 0 { // delete the record in mysql.column_stats_usage - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.column_stats_usage where table_id = %? and column_id = %?", physicalID, histID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.column_stats_usage where table_id = %? and column_id = %?", physicalID, histID); err != nil { return err } } @@ -322,14 +319,13 @@ func (h *Handle) DeleteTableStatsFromKV(statsIDs []int64) (err error) { return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) - ctx := util.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin") + sctx := se.(sessionctx.Context) + _, err = util.Exec(sctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() startTS, err := getSessionTxnStartTS(se) if err != nil { @@ -337,31 +333,31 @@ func (h *Handle) DeleteTableStatsFromKV(statsIDs []int64) (err error) { } for _, statsID := range statsIDs { // We only update the version so that other tidb will know that this table is deleted. - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, statsID); err != nil { + if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, statsID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %?", statsID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_histograms where table_id = %?", statsID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %?", statsID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %?", statsID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %?", statsID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %?", statsID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?, status = %? where table_id = %? and status in (%?, %?)", startTS, statistics.ExtendedStatsDeleted, statsID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited); err != nil { + if _, err = util.Exec(sctx, "update mysql.stats_extended set version = %?, status = %? where table_id = %? and status in (%?, %?)", startTS, statistics.ExtendedStatsDeleted, statsID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %?", statsID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %?", statsID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.column_stats_usage where table_id = %?", statsID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.column_stats_usage where table_id = %?", statsID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.analyze_options where table_id = %?", statsID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.analyze_options where table_id = %?", statsID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, lockstats.DeleteLockSQL, statsID); err != nil { + if _, err = util.Exec(sctx, lockstats.DeleteLockSQL, statsID); err != nil { return err } } @@ -374,16 +370,15 @@ func (h *Handle) removeDeletedExtendedStats(version uint64) (err error) { return err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) - ctx := util.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + sctx := se.(sessionctx.Context) + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() const sql = "delete from mysql.stats_extended where status = %? and version < %?" - _, err = exec.ExecuteInternal(ctx, sql, statistics.ExtendedStatsDeleted, version) + _, err = util.Exec(sctx, sql, statistics.ExtendedStatsDeleted, version) return } diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 4f3199c54f..b5af82ef79 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -15,7 +15,6 @@ package handle import ( - "context" "encoding/json" "fmt" "math" @@ -106,31 +105,12 @@ type Handle struct { lease atomic2.Duration } -func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { - se, err := h.pool.Get() - if err != nil { - return nil, nil, errors.Trace(err) - } - defer h.pool.Put(se) - - exec := se.(sqlexec.RestrictedSQLExecutor) - return fn(ctx, exec) -} - -func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { - return h.withRestrictedSQLExecutor(util.StatsCtx(ctx), func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) { - return exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, params...) - }) -} - -func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string, snapshot uint64, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { - return h.withRestrictedSQLExecutor(util.StatsCtx(ctx), func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) { - optFuncs := []sqlexec.OptionFuncAlias{ - sqlexec.ExecOptionWithSnapshot(snapshot), - sqlexec.ExecOptionUseCurSession, - } - return exec.ExecRestrictedSQL(ctx, optFuncs, sql, params...) +func (h *Handle) execRows(sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, rerr error) { + _ = h.callWithSCtx(func(sctx sessionctx.Context) error { + rows, fields, rerr = util.ExecRows(sctx, sql, args...) + return nil }) + return } // Clear the statsCache, only for test. @@ -243,8 +223,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { } else { lastVersion = 0 } - ctx := util.StatsCtx(context.Background()) - rows, _, err := h.execRestrictedSQL(ctx, "SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion) + rows, _, err := h.execRows("SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion) if err != nil { return errors.Trace(err) } @@ -680,8 +659,7 @@ type colStatsTimeInfo struct { // getDisableColumnTrackingTime reads the value of tidb_disable_column_tracking_time from mysql.tidb if it exists. func (h *Handle) getDisableColumnTrackingTime() (*time.Time, error) { - ctx := util.StatsCtx(context.Background()) - rows, fields, err := h.execRestrictedSQL(ctx, "SELECT variable_value FROM %n.%n WHERE variable_name = %?", mysql.SystemDB, mysql.TiDBTable, variable.TiDBDisableColumnTrackingTime) + rows, fields, err := h.execRows("SELECT variable_value FROM %n.%n WHERE variable_name = %?", mysql.SystemDB, mysql.TiDBTable, variable.TiDBDisableColumnTrackingTime) if err != nil { return nil, err } @@ -707,9 +685,8 @@ func (h *Handle) LoadColumnStatsUsage(loc *time.Location) (map[model.TableItemID if err != nil { return nil, errors.Trace(err) } - ctx := util.StatsCtx(context.Background()) // Since we use another session from session pool to read mysql.column_stats_usage, which may have different @@time_zone, so we do time zone conversion here. - rows, _, err := h.execRestrictedSQL(ctx, "SELECT table_id, column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00'), CONVERT_TZ(last_analyzed_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage") + rows, _, err := h.execRows("SELECT table_id, column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00'), CONVERT_TZ(last_analyzed_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage") if err != nil { return nil, errors.Trace(err) } @@ -747,9 +724,8 @@ func (h *Handle) LoadColumnStatsUsage(loc *time.Location) (map[model.TableItemID // CollectColumnsInExtendedStats returns IDs of the columns involved in extended stats. func (h *Handle) CollectColumnsInExtendedStats(tableID int64) ([]int64, error) { - ctx := util.StatsCtx(context.Background()) const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)" - rows, _, err := h.execRestrictedSQL(ctx, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) + rows, _, err := h.execRows(sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) if err != nil { return nil, errors.Trace(err) } @@ -776,8 +752,7 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) { if err != nil { return nil, errors.Trace(err) } - ctx := util.StatsCtx(context.Background()) - rows, _, err := h.execRestrictedSQL(ctx, "SELECT column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage WHERE table_id = %? AND last_used_at IS NOT NULL", tableID) + rows, _, err := h.execRows("SELECT column_id, CONVERT_TZ(last_used_at, @@TIME_ZONE, '+00:00') FROM mysql.column_stats_usage WHERE table_id = %? AND last_used_at IS NOT NULL", tableID) if err != nil { return nil, errors.Trace(err) } @@ -805,7 +780,6 @@ const maxColumnSize = 6 << 20 // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) { - ctx := util.StatsCtx(context.Background()) var js *storage.JSONTable var err error if isPartition { @@ -837,20 +811,20 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. return 0, err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) + sctx := se.(sessionctx.Context) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return version, errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() ts := time.Now().Format("2006-01-02 15:04:05.999999") const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)" for i := 0; i < len(blocks); i++ { - if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil { + if _, err := util.Exec(sctx, sql, physicalID, blocks[i], i, version, ts); err != nil { return version, errors.Trace(err) } } @@ -878,20 +852,19 @@ func (h *Handle) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, p return err } defer h.pool.Put(se) - exec := se.(sqlexec.RestrictedSQLExecutor) - ctx := util.StatsCtx(context.Background()) + sctx := se.(sessionctx.Context) jobInfo := job.JobInfo const textMaxLength = 65535 if len(jobInfo) > textMaxLength { jobInfo = jobInfo[:textMaxLength] } const insertJob = "INSERT INTO mysql.analyze_jobs (table_schema, table_name, partition_name, job_info, state, instance, process_id) VALUES (%?, %?, %?, %?, %?, %?, %?)" - _, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, insertJob, job.DBName, job.TableName, job.PartitionName, jobInfo, statistics.AnalyzePending, instance, procID) + _, _, err = util.ExecRows(sctx, insertJob, job.DBName, job.TableName, job.PartitionName, jobInfo, statistics.AnalyzePending, instance, procID) if err != nil { return err } const getJobID = "SELECT LAST_INSERT_ID()" - rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, getJobID) + rows, _, err := util.ExecRows(sctx, getJobID) if err != nil { return err } @@ -913,8 +886,7 @@ func (h *Handle) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, p // DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. func (h *Handle) DeleteAnalyzeJobs(updateTime time.Time) error { - ctx := util.StatsCtx(context.Background()) - _, _, err := h.execRestrictedSQL(ctx, "DELETE FROM mysql.analyze_jobs WHERE update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)", updateTime.UTC().Format(types.TimeFormat)) + _, _, err := h.execRows("DELETE FROM mysql.analyze_jobs WHERE update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)", updateTime.UTC().Format(types.TimeFormat)) return err } diff --git a/statistics/handle/history/BUILD.bazel b/statistics/handle/history/BUILD.bazel index 5f3b380952..3830a3ce51 100644 --- a/statistics/handle/history/BUILD.bazel +++ b/statistics/handle/history/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "//sessionctx", "//statistics/handle/cache", "//statistics/handle/util", - "//util/sqlexec", "@com_github_pingcap_errors//:errors", ], ) diff --git a/statistics/handle/history/history_stats.go b/statistics/handle/history/history_stats.go index a844a3ed2c..4b083250a4 100644 --- a/statistics/handle/history/history_stats.go +++ b/statistics/handle/history/history_stats.go @@ -15,13 +15,10 @@ package history import ( - "context" - "github.com/pingcap/errors" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics/handle/cache" "github.com/pingcap/tidb/statistics/handle/util" - "github.com/pingcap/tidb/util/sqlexec" ) // RecordHistoricalStatsMeta records the historical stats meta. @@ -32,10 +29,7 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u if !sctx.GetSessionVars().EnableHistoricalStats { return nil } - ctx := util.StatsCtx(context.Background()) - exec := sctx.(sqlexec.SQLExecutor) - rexec := sctx.(sqlexec.RestrictedSQLExecutor) - rows, _, err := rexec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) + rows, _, err := util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) if err != nil { return errors.Trace(err) } @@ -44,16 +38,16 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u } modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())" - if _, err := exec.ExecuteInternal(ctx, sql, tableID, modifyCount, count, version, source); err != nil { + if _, err := util.Exec(sctx, sql, tableID, modifyCount, count, version, source); err != nil { return errors.Trace(err) } cache.TableRowStatsCache.Invalidate(tableID) diff --git a/statistics/handle/storage/read.go b/statistics/handle/storage/read.go index eeffcf4aa6..436e061f38 100644 --- a/statistics/handle/storage/read.go +++ b/statistics/handle/storage/read.go @@ -596,14 +596,13 @@ func loadNeededIndexHistograms(reader *StatsReader, statsCache *cache.StatsCache // StatsMetaByTableIDFromStorage gets the stats meta of a table from storage. func StatsMetaByTableIDFromStorage(sctx sessionctx.Context, tableID int64, snapshot uint64) (version uint64, modifyCount, count int64, err error) { - ctx := util.StatsCtx(context.Background()) var rows []chunk.Row - exec := sctx.(sqlexec.RestrictedSQLExecutor) if snapshot == 0 { - rows, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, + rows, _, err = util.ExecRows(sctx, "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) } else { - rows, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession, sqlexec.ExecOptionWithSnapshot(snapshot)}, + rows, _, err = util.ExecWithOpts(sctx, + []sqlexec.OptionFuncAlias{sqlexec.ExecOptionWithSnapshot(snapshot), sqlexec.ExecOptionUseCurSession}, "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) } if err != nil || len(rows) == 0 { diff --git a/statistics/handle/storage/save.go b/statistics/handle/storage/save.go index 49a0053bba..f7b6422cc2 100644 --- a/statistics/handle/storage/save.go +++ b/statistics/handle/storage/save.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle/cache" "github.com/pingcap/tidb/statistics/handle/util" @@ -41,7 +40,7 @@ const batchInsertSize = 10 // maxInsertLength is the length limit for internal insert SQL. const maxInsertLength = 1024 * 1024 -func saveTopNToStorage(ctx context.Context, exec sqlexec.SQLExecutor, tableID int64, isIndex int, histID int64, topN *statistics.TopN) error { +func saveTopNToStorage(sctx sessionctx.Context, tableID int64, isIndex int, histID int64, topN *statistics.TopN) error { if topN == nil { return nil } @@ -65,17 +64,18 @@ func saveTopNToStorage(ctx context.Context, exec sqlexec.SQLExecutor, tableID in sql.WriteString(val) } i = end - if _, err := exec.ExecuteInternal(ctx, sql.String()); err != nil { + if _, err := util.Exec(sctx, sql.String()); err != nil { return err } } return nil } -func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stmtctx.StatementContext, tableID int64, isIndex int, hg *statistics.Histogram) (lastAnalyzePos []byte, err error) { +func saveBucketsToStorage(sctx sessionctx.Context, tableID int64, isIndex int, hg *statistics.Histogram) (lastAnalyzePos []byte, err error) { if hg == nil { return } + sc := sctx.GetSessionVars().StmtCtx for i := 0; i < len(hg.Buckets); { end := i + batchInsertSize if end > len(hg.Buckets) { @@ -113,7 +113,7 @@ func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stm sql.WriteString(val) } i = end - if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil { + if _, err = util.Exec(sctx, sql.String()); err != nil { return } } @@ -139,13 +139,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, } }() ctx := util.StatsCtx(context.Background()) - exec := sctx.(sqlexec.SQLExecutor) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return err } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() txn, err := sctx.Txn(true) if err != nil { @@ -155,7 +154,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, // 1. Save mysql.stats_meta. var rs sqlexec.RecordSet // Lock this row to prevent writing of concurrent analyze. - rs, err = exec.ExecuteInternal(ctx, "select snapshot, count, modify_count from mysql.stats_meta where table_id = %? for update", tableID) + rs, err = util.Exec(sctx, "select snapshot, count, modify_count from mysql.stats_meta where table_id = %? for update", tableID) if err != nil { return err } @@ -192,7 +191,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, snapShot = 0 count = 0 } - if _, err = exec.ExecuteInternal(ctx, + if _, err = util.Exec(sctx, "replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)", version, tableID, @@ -205,7 +204,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, } else if results.ForMVIndex { // 1-2. There's already an existing record for this table, and we are handling stats for mv index now. // In this case, we only update the version. See comments for AnalyzeResults.ForMVIndex for more details. - if _, err = exec.ExecuteInternal(ctx, + if _, err = util.Exec(sctx, "update mysql.stats_meta set version=%? where table_id=%?", version, tableID, @@ -245,7 +244,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, zap.Int64("results.Count", results.Count), zap.Int64("count", cnt)) } - if _, err = exec.ExecuteInternal(ctx, + if _, err = util.Exec(sctx, "update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?", version, modifyCnt, @@ -278,40 +277,39 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, return err } // Delete outdated data - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { return err } - if err = saveTopNToStorage(ctx, exec, tableID, result.IsIndex, hg.ID, result.TopNs[i]); err != nil { + if err = saveTopNToStorage(sctx, tableID, result.IsIndex, hg.ID, result.TopNs[i]); err != nil { return err } - if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { + if _, err := util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { return err } if fmSketch != nil && needDumpFMS { - if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, fmSketch); err != nil { + if _, err = util.Exec(sctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, fmSketch); err != nil { return err } } - if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", + if _, err = util.Exec(sctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, results.StatsVer, statistics.AnalyzeFlag, hg.Correlation); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { return err } - sc := sctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte - lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, result.IsIndex, hg) + lastAnalyzePos, err = saveBucketsToStorage(sctx, tableID, result.IsIndex, hg) if err != nil { return err } if len(lastAnalyzePos) > 0 { - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil { + if _, err = util.Exec(sctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil { return err } } if result.IsIndex == 0 { - if _, err = exec.ExecuteInternal(ctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = values(last_analyzed_at)", tableID, hg.ID); err != nil { + if _, err = util.Exec(sctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = values(last_analyzed_at)", tableID, hg.ID); err != nil { return err } } @@ -336,7 +334,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, case ast.StatsTypeDependency: statsStr = item.StringVals } - if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil { + if _, err = util.Exec(sctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, statistics.ExtendedStatsAnalyzed); err != nil { return err } } @@ -358,14 +356,12 @@ func SaveStatsToStorage(sctx sessionctx.Context, } }() - exec := sctx.(sqlexec.SQLExecutor) - ctx := util.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + _, err = util.Exec(sctx, "begin pessimistic") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() version, err := util.GetStartTS(sctx) if err != nil { @@ -374,10 +370,10 @@ func SaveStatsToStorage(sctx sessionctx.Context, // If the count is less than 0, then we do not want to update the modify count and count. if count >= 0 { - _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount) + _, err = util.Exec(sctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount) cache.TableRowStatsCache.Invalidate(tableID) } else { - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID) + _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID) } if err != nil { return err @@ -388,39 +384,38 @@ func SaveStatsToStorage(sctx sessionctx.Context, return err } // Delete outdated data - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { return err } - if err = saveTopNToStorage(ctx, exec, tableID, isIndex, hg.ID, topN); err != nil { + if err = saveTopNToStorage(sctx, tableID, isIndex, hg.ID, topN); err != nil { return err } - if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + if _, err := util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { return err } flag := 0 if isAnalyzed == 1 { flag = statistics.AnalyzeFlag } - if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", + if _, err = util.Exec(sctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, statsVersion, flag, hg.Correlation); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { return err } - sc := sctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte - lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, isIndex, hg) + lastAnalyzePos, err = saveBucketsToStorage(sctx, tableID, isIndex, hg) if err != nil { return err } if isAnalyzed == 1 && len(lastAnalyzePos) > 0 { - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil { + if _, err = util.Exec(sctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil { return err } } if updateAnalyzeTime && isIndex == 0 { - if _, err = exec.ExecuteInternal(ctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = current_timestamp()", tableID, hg.ID); err != nil { + if _, err = util.Exec(sctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = current_timestamp()", tableID, hg.ID); err != nil { return err } } @@ -439,20 +434,18 @@ func SaveMetaToStorage( } }() - exec := sctx.(sqlexec.SQLExecutor) - ctx := util.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin") + _, err = util.Exec(sctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = util.FinishTransaction(ctx, exec, err) + err = util.FinishTransaction(sctx, err) }() version, err := util.GetStartTS(sctx) if err != nil { return errors.Trace(err) } - _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount) + _, err = util.Exec(sctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount) statsVer = version cache.TableRowStatsCache.Invalidate(tableID) return err diff --git a/statistics/handle/update.go b/statistics/handle/update.go index aa22c1a7f7..ea957eda98 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -16,7 +16,6 @@ package handle import ( "cmp" - "context" "fmt" "slices" "strings" @@ -284,15 +283,13 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI return false, err } defer h.pool.Put(se) - exec := se.(sqlexec.SQLExecutor) sctx := se.(sessionctx.Context) - ctx := utilstats.StatsCtx(context.Background()) - _, err = exec.ExecuteInternal(ctx, "begin") + _, err = utilstats.Exec(sctx, "begin") if err != nil { return false, errors.Trace(err) } defer func() { - err = utilstats.FinishTransaction(ctx, exec, err) + err = utilstats.FinishTransaction(sctx, err) }() statsVersion, err = getSessionTxnStartTS(se) @@ -327,7 +324,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI isPartitionLocked = true } tableOrPartitionLocked := isTableLocked || isPartitionLocked - if err = updateStatsMeta(ctx, exec, statsVersion, delta, + if err = updateStatsMeta(sctx, statsVersion, delta, physicalTableID, tableOrPartitionLocked); err != nil { return } @@ -336,7 +333,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI // We will update its global-stats when the partition is unlocked. if isTableLocked || !isPartitionLocked { // If it's a partitioned table and its global-stats exists, update its count and modify_count as well. - if err = updateStatsMeta(ctx, exec, statsVersion, delta, tableID, isTableLocked); err != nil { + if err = updateStatsMeta(sctx, statsVersion, delta, tableID, isTableLocked); err != nil { return } affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() @@ -348,7 +345,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI if _, ok := lockedTables[physicalTableID]; ok { isTableLocked = true } - if err = updateStatsMeta(ctx, exec, statsVersion, delta, + if err = updateStatsMeta(sctx, statsVersion, delta, physicalTableID, isTableLocked); err != nil { return } @@ -360,8 +357,7 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI } func updateStatsMeta( - ctx context.Context, - exec sqlexec.SQLExecutor, + sctx sessionctx.Context, startTS uint64, delta variable.TableDelta, id int64, @@ -369,21 +365,21 @@ func updateStatsMeta( ) (err error) { if isLocked { if delta.Delta < 0 { - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_table_locked set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?", + _, err = utilstats.Exec(sctx, "update mysql.stats_table_locked set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?", startTS, -delta.Delta, delta.Count, id, -delta.Delta) } else { - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_table_locked set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", + _, err = utilstats.Exec(sctx, "update mysql.stats_table_locked set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", startTS, delta.Delta, delta.Count, id) } } else { if delta.Delta < 0 { // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta. - _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+ + _, err = utilstats.Exec(sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+ "update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)", startTS, id, delta.Count, -delta.Delta, -delta.Delta) } else { // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta. - _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+ + _, err = utilstats.Exec(sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+ "update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS, id, delta.Count, delta.Delta) } @@ -406,10 +402,9 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e if len(values) == 0 { return nil } - ctx := utilstats.StatsCtx(context.Background()) sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+ "values %s on duplicate key update tot_col_size = tot_col_size + values(tot_col_size)", strings.Join(values, ",")) - _, _, err := h.execRestrictedSQL(ctx, sql) + _, _, err := h.execRows(sql) return errors.Trace(err) } @@ -454,7 +449,7 @@ func (h *Handle) DumpColStatsUsageToKV() error { } } sqlexec.MustFormatSQL(sql, " ON DUPLICATE KEY UPDATE last_used_at = CASE WHEN last_used_at IS NULL THEN VALUES(last_used_at) ELSE GREATEST(last_used_at, VALUES(last_used_at)) END") - if _, _, err := h.execRestrictedSQL(context.Background(), sql.String()); err != nil { + if _, _, err := h.execRows(sql.String()); err != nil { return errors.Trace(err) } for j := i; j < end; j++ { diff --git a/statistics/handle/usage/index_usage.go b/statistics/handle/usage/index_usage.go index d97d7d7951..6d83a43045 100644 --- a/statistics/handle/usage/index_usage.go +++ b/statistics/handle/usage/index_usage.go @@ -15,7 +15,6 @@ package usage import ( - "context" "strings" "sync" "time" @@ -146,15 +145,8 @@ func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsageMap { // batchInsertSize is the batch size used by internal SQL to insert values to some system table. const batchInsertSize = 10 -var ( - // useCurrentSession to make sure the sql is executed in current session. - useCurrentSession = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession} -) - // DumpIndexUsageToKV will dump in-memory index usage information to KV. func DumpIndexUsageToKV(sctx sessionctx.Context, listHead *SessionIndexUsageCollector) error { - ctx := util.StatsCtx(context.Background()) - exec := sctx.(sqlexec.RestrictedSQLExecutor) mapper := sweepIdxUsageList(listHead) type FullIndexUsageInformation struct { information IndexUsageInformation @@ -180,7 +172,7 @@ func DumpIndexUsageToKV(sctx sessionctx.Context, listHead *SessionIndexUsageColl } } sqlexec.MustFormatSQL(sql, "on duplicate key update query_count=query_count+values(query_count),rows_selected=rows_selected+values(rows_selected),last_used_at=greatest(last_used_at, values(last_used_at))") - if _, _, err := exec.ExecRestrictedSQL(ctx, useCurrentSession, sql.String()); err != nil { + if _, _, err := util.ExecRows(sctx, sql.String()); err != nil { return errors.Trace(err) } } @@ -193,8 +185,6 @@ func GCIndexUsageOnKV(sctx sessionctx.Context) error { // We periodically delete the usage information of non-existent indexes through information_schema.tidb_indexes. // This sql will delete the usage information of those indexes that not in information_schema.tidb_indexes. sql := `delete from mysql.SCHEMA_INDEX_USAGE as stats where stats.index_id not in (select idx.index_id from information_schema.tidb_indexes as idx)` - ctx := util.StatsCtx(context.Background()) - exec := sctx.(sqlexec.RestrictedSQLExecutor) - _, _, err := exec.ExecRestrictedSQL(ctx, useCurrentSession, sql) + _, _, err := util.ExecRows(sctx, sql) return err } diff --git a/statistics/handle/util/util.go b/statistics/handle/util/util.go index ad85530451..ec6d364140 100644 --- a/statistics/handle/util/util.go +++ b/statistics/handle/util/util.go @@ -32,15 +32,11 @@ func StatsCtx(ctx context.Context) context.Context { } // FinishTransaction will execute `commit` when error is nil, otherwise `rollback`. -func FinishTransaction(ctx context.Context, exec interface{}, err error) error { - sqlExec, ok := exec.(sqlexec.SQLExecutor) - if !ok { - return errors.Errorf("invalid sql executor") - } +func FinishTransaction(sctx sessionctx.Context, err error) error { if err == nil { - _, err = sqlExec.ExecuteInternal(ctx, "commit") + _, err = Exec(sctx, "commit") } else { - _, err1 := sqlExec.ExecuteInternal(ctx, "rollback") + _, err1 := Exec(sctx, "rollback") terror.Log(errors.Trace(err1)) } return errors.Trace(err) @@ -55,11 +51,30 @@ func GetStartTS(sctx sessionctx.Context) (uint64, error) { return txn.StartTS(), nil } -// Read is a helper function to execute sql and return rows and fields. -func Read(exec interface{}, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { - sqlExec, ok := exec.(sqlexec.RestrictedSQLExecutor) +// Exec is a helper function to execute sql and return RecordSet. +func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) { + sqlExec, ok := sctx.(sqlexec.SQLExecutor) + if !ok { + return nil, errors.Errorf("invalid sql executor") + } + // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor + return sqlExec.ExecuteInternal(StatsCtx(context.Background()), sql, args...) +} + +// ExecRows is a helper function to execute sql and return rows and fields. +func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) if !ok { return nil, nil, errors.Errorf("invalid sql executor") } return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...) } + +// ExecWithOpts is a helper function to execute sql and return rows and fields. +func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, nil, errors.Errorf("invalid sql executor") + } + return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), opts, sql, args...) +}