statistics: support download history stats from stats_history (#39701)
This commit is contained in:
@ -15,6 +15,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -24,8 +25,8 @@ import (
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/parser/mysql"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
)
|
||||
|
||||
@ -105,14 +106,14 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
return
|
||||
}
|
||||
defer se.Close()
|
||||
|
||||
dumpPartitionStats := true
|
||||
if len(params[pDumpPartitionStats]) > 0 {
|
||||
dumpPartitionStats, err = strconv.ParseBool(params[pDumpPartitionStats])
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
enabeld, err := sh.do.StatsHandle().CheckHistoricalStatsEnable()
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
if !enabeld {
|
||||
writeError(w, fmt.Errorf("%v should be enabled", variable.TiDBEnableHistoricalStats))
|
||||
return
|
||||
}
|
||||
|
||||
se.GetSessionVars().StmtCtx.TimeZone = time.Local
|
||||
@ -127,12 +128,6 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
return
|
||||
}
|
||||
snapshot := oracle.GoTimeToTS(t1)
|
||||
err = gcutil.ValidateSnapshot(se, snapshot)
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
is, err := sh.do.GetSnapshotInfoSchema(snapshot)
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
@ -144,7 +139,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot, dumpPartitionStats)
|
||||
js, err := h.DumpHistoricalStatsBySnapshot(params[pDBName], tbl.Meta(), snapshot)
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
} else {
|
||||
|
||||
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/statistics/handle"
|
||||
"github.com/pingcap/tidb/testkit"
|
||||
@ -59,6 +60,10 @@ func TestDumpStatsAPI(t *testing.T) {
|
||||
statsHandler := &StatsHandler{dom}
|
||||
|
||||
prepareData(t, client, statsHandler)
|
||||
tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test"))
|
||||
require.NoError(t, err)
|
||||
err = dom.GetHistoricalStatsWorker().DumpHistoricalStats(tableInfo.Meta().ID, dom.StatsHandle())
|
||||
require.NoError(t, err)
|
||||
|
||||
router := mux.NewRouter()
|
||||
router.Handle("/stats/dump/{db}/{table}", statsHandler)
|
||||
@ -168,6 +173,7 @@ func prepareData(t *testing.T, client *testServerClient, statHandle *StatsHandle
|
||||
tk.MustExec("insert test values (1, 's')")
|
||||
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
|
||||
tk.MustExec("analyze table test")
|
||||
tk.MustExec("set global tidb_enable_historical_stats = 1")
|
||||
tk.MustExec("insert into test(a,b) values (1, 'v'),(3, 'vvv'),(5, 'vv')")
|
||||
is := statHandle.do.InfoSchema()
|
||||
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
|
||||
|
||||
@ -221,7 +221,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
|
||||
h.recordHistoricalStatsMeta(physicalID, statsVer)
|
||||
}
|
||||
}()
|
||||
h.mu.Lock()
|
||||
@ -263,7 +263,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
|
||||
h.recordHistoricalStatsMeta(physicalID, statsVer)
|
||||
}
|
||||
}()
|
||||
h.mu.Lock()
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
@ -130,6 +131,42 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo,
|
||||
return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot, dumpPartitionStats)
|
||||
}
|
||||
|
||||
// DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history
|
||||
func (h *Handle) DumpHistoricalStatsBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) {
|
||||
pi := tableInfo.GetPartitionInfo()
|
||||
if pi == nil {
|
||||
return h.tableHistoricalStatsToJSON(tableInfo.ID, snapshot)
|
||||
}
|
||||
jsonTbl := &JSONTable{
|
||||
DatabaseName: dbName,
|
||||
TableName: tableInfo.Name.L,
|
||||
Partitions: make(map[string]*JSONTable, len(pi.Definitions)),
|
||||
}
|
||||
for _, def := range pi.Definitions {
|
||||
tbl, err := h.tableHistoricalStatsToJSON(def.ID, snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if tbl == nil {
|
||||
continue
|
||||
}
|
||||
jsonTbl.Partitions[def.Name.L] = tbl
|
||||
}
|
||||
h.mu.Lock()
|
||||
isDynamicMode := variable.PartitionPruneMode(h.mu.ctx.GetSessionVars().PartitionPruneMode.Load()) == variable.Dynamic
|
||||
h.mu.Unlock()
|
||||
if isDynamicMode {
|
||||
tbl, err := h.tableHistoricalStatsToJSON(tableInfo.ID, snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if tbl != nil {
|
||||
jsonTbl.Partitions["global"] = tbl
|
||||
}
|
||||
}
|
||||
return jsonTbl, nil
|
||||
}
|
||||
|
||||
// DumpStatsToJSONBySnapshot dumps statistic to json.
|
||||
func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*JSONTable, error) {
|
||||
h.mu.Lock()
|
||||
@ -194,6 +231,62 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati
|
||||
return jsonTbl, nil
|
||||
}
|
||||
|
||||
func (h *Handle) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (*JSONTable, error) {
|
||||
reader, err := h.getGlobalStatsReader(0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err1 := h.releaseGlobalStatsReader(reader)
|
||||
if err == nil && err1 != nil {
|
||||
err = err1
|
||||
}
|
||||
}()
|
||||
|
||||
// get meta version
|
||||
rows, _, err := reader.read("select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.AddStack(err)
|
||||
}
|
||||
if len(rows) < 1 {
|
||||
return nil, fmt.Errorf("failed to get records of stats_meta_history for table_id = %v, snapshot = %v", physicalID, snapshot)
|
||||
}
|
||||
statsMetaVersion := rows[0].GetInt64(0)
|
||||
// get stats meta
|
||||
rows, _, err = reader.read("select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion)
|
||||
if err != nil {
|
||||
return nil, errors.AddStack(err)
|
||||
}
|
||||
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)
|
||||
|
||||
// get stats version
|
||||
rows, _, err = reader.read("select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.AddStack(err)
|
||||
}
|
||||
if len(rows) < 1 {
|
||||
return nil, fmt.Errorf("failed to get record of stats_history for table_id = %v, snapshot = %v", physicalID, snapshot)
|
||||
}
|
||||
statsVersion := rows[0].GetInt64(0)
|
||||
|
||||
// get stats
|
||||
rows, _, err = reader.read("select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion)
|
||||
if err != nil {
|
||||
return nil, errors.AddStack(err)
|
||||
}
|
||||
blocks := make([][]byte, 0)
|
||||
for _, row := range rows {
|
||||
blocks = append(blocks, row.GetBytes(0))
|
||||
}
|
||||
jsonTbl, err := BlocksToJSONTable(blocks)
|
||||
if err != nil {
|
||||
return nil, errors.AddStack(err)
|
||||
}
|
||||
jsonTbl.Count = count
|
||||
jsonTbl.ModifyCount = modifyCount
|
||||
return jsonTbl, nil
|
||||
}
|
||||
|
||||
func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) {
|
||||
tbl, err := h.TableStatsFromStorage(tableInfo, physicalID, true, snapshot)
|
||||
if err != nil || tbl == nil {
|
||||
|
||||
@ -1619,7 +1619,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, ana
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
}
|
||||
}()
|
||||
h.mu.Lock()
|
||||
@ -1634,7 +1634,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = recordHistoricalStatsMeta(sctx, tableID, statsVer)
|
||||
if err1 := recordHistoricalStatsMeta(sctx, tableID, statsVer); err1 != nil {
|
||||
logutil.BgLogger().Error("record historical stats meta failed",
|
||||
zap.Int64("table-id", tableID),
|
||||
zap.Uint64("version", statsVer),
|
||||
zap.Error(err1))
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
|
||||
@ -1808,7 +1813,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isI
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
}
|
||||
}()
|
||||
h.mu.Lock()
|
||||
@ -1887,7 +1892,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
}
|
||||
}()
|
||||
h.mu.Lock()
|
||||
@ -2093,7 +2098,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
}
|
||||
}()
|
||||
slices.Sort(colIDs)
|
||||
@ -2164,7 +2169,7 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
}
|
||||
}()
|
||||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
|
||||
@ -2377,7 +2382,7 @@ func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
h.recordHistoricalStatsMeta(tableID, statsVer)
|
||||
}
|
||||
}()
|
||||
if extStats == nil || len(extStats.Stats) == 0 {
|
||||
@ -2676,10 +2681,16 @@ func recordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error {
|
||||
func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return recordHistoricalStatsMeta(h.mu.ctx, tableID, version)
|
||||
err := recordHistoricalStatsMeta(h.mu.ctx, tableID, version)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("record historical stats meta failed",
|
||||
zap.Int64("table-id", tableID),
|
||||
zap.Uint64("version", version),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job.
|
||||
|
||||
@ -524,7 +524,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up
|
||||
statsVer := uint64(0)
|
||||
defer func() {
|
||||
if err == nil && statsVer != 0 {
|
||||
err = h.recordHistoricalStatsMeta(id, statsVer)
|
||||
h.recordHistoricalStatsMeta(id, statsVer)
|
||||
}
|
||||
}()
|
||||
if delta.Count == 0 {
|
||||
|
||||
Reference in New Issue
Block a user