From fc6b939ecc3e56ffbcbdfd338e88fc3eb98b70a3 Mon Sep 17 00:00:00 2001 From: dongjunduo Date: Thu, 3 Mar 2022 19:05:47 +0800 Subject: [PATCH] planner: introduce historical meta stats auto-dump (#32041) ref pingcap/tidb#18745 --- executor/analyze_test.go | 41 +++++++++++ executor/infoschema_cluster_table_test.go | 6 +- session/bootstrap.go | 24 ++++++- session/bootstrap_upgrade_test.go | 32 +++++++++ statistics/handle/ddl.go | 14 ++++ statistics/handle/handle.go | 85 +++++++++++++++++++++++ statistics/handle/update.go | 7 ++ 7 files changed, 205 insertions(+), 4 deletions(-) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 0be814d76a..1dd9b426c0 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -2612,3 +2612,44 @@ func TestRecordHistoryStatsAfterAnalyze(t *testing.T) { // 5. historical stats must be equal to the current stats require.JSONEq(t, string(jsOrigin), string(jsCur)) } + +func TestRecordHistoryStatsMetaAfterAnalyze(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 2") + tk.MustExec("set global tidb_enable_historical_stats = 0") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("analyze table test.t") + + h := dom.StatsHandle() + is := dom.InfoSchema() + tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + // 1. switch off the tidb_enable_historical_stats, and there is no record in table `mysql.stats_meta_history` + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) + // insert demo tuples, and there is no record either. + insertNums := 5 + for i := 0; i < insertNums; i++ { + tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)") + err := h.DumpStatsDeltaToKV(handle.DumpDelta) + require.NoError(t, err) + } + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) + + // 2. switch on the tidb_enable_historical_stats and insert tuples to produce count/modifyCount delta change. + tk.MustExec("set global tidb_enable_historical_stats = 1") + defer tk.MustExec("set global tidb_enable_historical_stats = 0") + + for i := 0; i < insertNums; i++ { + tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)") + err := h.DumpStatsDeltaToKV(handle.DumpDelta) + require.NoError(t, err) + } + tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta_history where table_id = '%d' order by create_time", tableInfo.Meta().ID)).Sort().Check( + testkit.Rows("18 18", "21 21", "24 24", "27 27", "30 30")) +} diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index 64e76cc529..3ca92396aa 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -319,7 +319,7 @@ func (s *infosSchemaClusterTableSuite) TestTableStorageStats() { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - s.Require().Len(rows, 29) + s.Require().Len(rows, 30) // More tests about the privileges. tk.MustExec("create user 'testuser'@'localhost'") @@ -345,12 +345,12 @@ func (s *infosSchemaClusterTableSuite) TestTableStorageStats() { Hostname: "localhost", }, nil, nil)) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("30")) s.Require().True(tk.Session().Auth(&auth.UserIdentity{ Username: "testuser3", Hostname: "localhost", }, nil, nil)) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("30")) } diff --git a/session/bootstrap.go b/session/bootstrap.go index 214ba31745..c1f04854d2 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -386,6 +386,16 @@ const ( UNIQUE KEY table_version_seq (table_id, version, seq_no), KEY table_create_time (table_id, create_time, seq_no) );` + // CreateStatsMetaHistory stores the historical meta stats. + CreateStatsMetaHistory = `CREATE TABLE IF NOT EXISTS mysql.stats_meta_history ( + table_id bigint(64) NOT NULL, + modify_count bigint(64) NOT NULL, + count bigint(64) NOT NULL, + version bigint(64) NOT NULL comment 'stats version which corresponding to stats:version in EXPLAIN', + create_time datetime(6) NOT NULL, + UNIQUE KEY table_version (table_id, version), + KEY table_create_time (table_id, create_time) + );` ) // bootstrap initiates system DB for a store. @@ -569,11 +579,13 @@ const ( version82 = 82 // version83 adds the tables mysql.stats_history version83 = 83 + // version84 adds the tables mysql.stats_meta_history + version84 = 84 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version83 +var currentBootstrapVersion int64 = version84 var ( bootstrapVersion = []func(Session, int64){ @@ -660,6 +672,7 @@ var ( upgradeToVer81, upgradeToVer82, upgradeToVer83, + upgradeToVer84, } ) @@ -1722,6 +1735,13 @@ func upgradeToVer83(s Session, ver int64) { doReentrantDDL(s, CreateStatsHistory) } +func upgradeToVer84(s Session, ver int64) { + if ver >= version84 { + return + } + doReentrantDDL(s, CreateStatsMetaHistory) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1810,6 +1830,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateAnalyzeOptionsTable) // Create stats_history table. mustExecute(s, CreateStatsHistory) + // Create stats_meta_history table. + mustExecute(s, CreateStatsMetaHistory) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/bootstrap_upgrade_test.go b/session/bootstrap_upgrade_test.go index 815bfebc08..15981329b4 100644 --- a/session/bootstrap_upgrade_test.go +++ b/session/bootstrap_upgrade_test.go @@ -55,3 +55,35 @@ func TestUpgradeVersion83(t *testing.T) { require.Equal(t, statsHistoryTblFields[i].tp, strings.ToLower(row.GetString(1))) } } + +func TestUpgradeVersion84(t *testing.T) { + ctx := context.Background() + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + ver, err := session.GetBootstrapVersion(tk.Session()) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion, ver) + + statsHistoryTblFields := []struct { + field string + tp string + }{ + {"table_id", "bigint(64)"}, + {"modify_count", "bigint(64)"}, + {"count", "bigint(64)"}, + {"version", "bigint(64)"}, + {"create_time", "datetime(6)"}, + } + rStatsHistoryTbl, err := tk.Exec(`desc mysql.stats_meta_history`) + require.NoError(t, err) + req := rStatsHistoryTbl.NewChunk(nil) + require.NoError(t, rStatsHistoryTbl.Next(ctx, req)) + require.Equal(t, 5, req.NumRows()) + for i := 0; i < 5; i++ { + row := req.GetRow(i) + require.Equal(t, statsHistoryTblFields[i].field, strings.ToLower(row.GetString(0))) + require.Equal(t, statsHistoryTblFields[i].tp, strings.ToLower(row.GetString(1))) + } +} diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 6fdf102548..9be4b1744a 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -176,6 +176,12 @@ func (h *Handle) DDLEventCh() chan *util.Event { // insertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the // new columns and indices which belong to this table. func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(physicalID, statsVer) + } + }() h.mu.Lock() defer h.mu.Unlock() ctx := context.Background() @@ -195,6 +201,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { return err } + statsVer = startTS for _, col := range info.Columns { if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { return err @@ -211,6 +218,12 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e // insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value. // This operation also updates version. func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(physicalID, statsVer) + } + }() h.mu.Lock() defer h.mu.Unlock() @@ -233,6 +246,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf if err != nil { return } + statsVer = startTS // If we didn't update anything by last SQL, it means the stats of this table does not exist. if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 { // By this step we can get the count of this table, then we can sure the count and repeats of bucket. diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 13e19d4b95..52997af59f 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -984,6 +984,12 @@ func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (int64, int64, erro // SaveTableStatsToStorage saves the stats of a table to storage. func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, needDumpFMS bool) (err error) { tableID := results.TableID.GetStatisticsID() + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(tableID, statsVer) + } + }() h.mu.Lock() defer h.mu.Unlock() ctx := context.TODO() @@ -1026,6 +1032,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)", version, tableID, results.Count, results.Snapshot); err != nil { return err } + statsVer = version } else { modifyCnt := curModifyCnt - results.BaseModifyCnt if modifyCnt < 0 { @@ -1038,6 +1045,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?", version, modifyCnt, cnt, results.Snapshot, tableID); err != nil { return err } + statsVer = version } // 2. Save histograms. for _, result := range results.Ars { @@ -1149,6 +1157,12 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee // SaveStatsToStorage saves the stats to storage. // TODO: refactor to reduce the number of parameters func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64, needDumpFMS bool, updateAnalyzeTime bool) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(tableID, statsVer) + } + }() h.mu.Lock() defer h.mu.Unlock() ctx := context.TODO() @@ -1175,6 +1189,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg if err != nil { return err } + statsVer = version cmSketch, err := statistics.EncodeCMSketchWithoutTopN(cms) if err != nil { return err @@ -1252,6 +1267,12 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg // SaveMetaToStorage will save stats_meta to storage. func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(tableID, statsVer) + } + }() h.mu.Lock() defer h.mu.Unlock() ctx := context.TODO() @@ -1269,6 +1290,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error } version := txn.StartTS() _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount) + statsVer = version return err } @@ -1445,6 +1467,12 @@ const ( // InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(tableID, statsVer) + } + }() sort.Slice(colIDs, func(i, j int) bool { return colIDs[i] < colIDs[j] }) bytes, err := json.Marshal(colIDs) if err != nil { @@ -1490,6 +1518,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { return err } + statsVer = version // Remove the existing 'deleted' records. if _, err = exec.ExecuteInternal(ctx, "DELETE FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID); err != nil { return err @@ -1509,6 +1538,12 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t // MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(tableID, statsVer) + } + }() ctx := context.Background() rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, StatsStatusInited, StatsStatusAnalyzed) if err != nil { @@ -1546,6 +1581,7 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { return err } + statsVer = version if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE name = %? and table_id = %?", version, StatsStatusDeleted, statsName, tableID); err != nil { return err } @@ -1715,6 +1751,12 @@ func (h *Handle) fillExtStatsCorrVals(item *statistics.ExtendedStatsItem, cols [ // SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(tableID, statsVer) + } + }() if extStats == nil || len(extStats.Stats) == 0 { return nil } @@ -1756,6 +1798,7 @@ func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics. if _, err := exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { return err } + statsVer = version } return nil } @@ -1956,9 +1999,51 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. // CheckHistoricalStatsEnable is used to check whether TiDBEnableHistoricalStats is enabled. func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) { + h.mu.Lock() + defer h.mu.Unlock() val, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) if err != nil { return false, errors.Trace(err) } return variable.TiDBOptOn(val), nil } + +func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error { + if tableID == 0 || version == 0 { + return errors.Errorf("tableID %d, version %d are invalid", tableID, version) + } + historicalStatsEnabled, err := h.CheckHistoricalStatsEnable() + if err != nil { + return errors.Errorf("check tidb_enable_historical_stats failed: %v", err) + } + if !historicalStatsEnabled { + return nil + } + + ctx := context.Background() + h.mu.Lock() + defer h.mu.Unlock() + rows, _, err := h.execRestrictedSQL(ctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return errors.New("no historical meta stats can be recorded") + } + modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) + + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + + const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, create_time) VALUES (%?, %?, %?, %?, NOW())" + if _, err := exec.ExecuteInternal(ctx, sql, tableID, modifyCount, count, version); err != nil { + return errors.Trace(err) + } + return nil +} diff --git a/statistics/handle/update.go b/statistics/handle/update.go index d1a1767ce2..a62ca64f69 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -484,6 +484,12 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { // dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version. func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (updated bool, err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(id, statsVer) + } + }() if delta.Count == 0 { return true, nil } @@ -511,6 +517,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up } else { _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", startTS, delta.Delta, delta.Count, id) } + statsVer = startTS return errors.Trace(err) } if err = updateStatsMeta(id); err != nil {