stats: implement stats handler for DDL notifier part 1 (#56419)
ref pingcap/tidb#55722
This commit is contained in:
@ -61,6 +61,8 @@ type HandlerID int
|
||||
const (
|
||||
// TestHandlerID is used for testing only.
|
||||
TestHandlerID HandlerID = 0
|
||||
// StatsMetaHandlerID is used to update statistics system table.
|
||||
StatsMetaHandlerID HandlerID = 1
|
||||
)
|
||||
|
||||
// String implements fmt.Stringer interface.
|
||||
@ -68,6 +70,8 @@ func (id HandlerID) String() string {
|
||||
switch id {
|
||||
case TestHandlerID:
|
||||
return "TestHandler"
|
||||
case StatsMetaHandlerID:
|
||||
return "StatsMetaHandler"
|
||||
default:
|
||||
return fmt.Sprintf("HandlerID(%d)", id)
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ go_library(
|
||||
"drop_partition.go",
|
||||
"exchange_partition.go",
|
||||
"reorganize_partition.go",
|
||||
"subscriber.go",
|
||||
"truncate_partition.go",
|
||||
],
|
||||
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/ddl",
|
||||
@ -17,6 +18,7 @@ go_library(
|
||||
"//pkg/meta/model",
|
||||
"//pkg/sessionctx",
|
||||
"//pkg/sessionctx/variable",
|
||||
"//pkg/statistics/handle/history",
|
||||
"//pkg/statistics/handle/lockstats",
|
||||
"//pkg/statistics/handle/logutil",
|
||||
"//pkg/statistics/handle/storage",
|
||||
|
||||
221
pkg/statistics/handle/ddl/subscriber.go
Normal file
221
pkg/statistics/handle/ddl/subscriber.go
Normal file
@ -0,0 +1,221 @@
|
||||
// Copyright 2024 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package ddl
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/pkg/ddl/notifier"
|
||||
"github.com/pingcap/tidb/pkg/meta/model"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/history"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/logutil"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/types"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/util"
|
||||
"github.com/pingcap/tidb/pkg/util/intest"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
statsCache types.StatsCache
|
||||
}
|
||||
|
||||
// NewHandlerAndRegister creates a new handler and registers it to the DDL
|
||||
// notifier.
|
||||
func NewHandlerAndRegister(statsCache types.StatsCache) {
|
||||
h := handler{statsCache: statsCache}
|
||||
notifier.RegisterHandler(notifier.StatsMetaHandlerID, h.handle)
|
||||
}
|
||||
|
||||
func (h handler) handle(
|
||||
ctx context.Context,
|
||||
sctx sessionctx.Context,
|
||||
change *notifier.SchemaChangeEvent,
|
||||
) error {
|
||||
switch change.GetType() {
|
||||
case model.ActionCreateTable:
|
||||
info := change.GetCreateTableInfo()
|
||||
ids, err := getPhysicalIDs(sctx, info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
err = h.insertStats4PhysicalID(ctx, sctx, info, id)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
case model.ActionTruncateTable:
|
||||
newTableInfo, droppedTableInfo := change.GetTruncateTableInfo()
|
||||
ids, err := getPhysicalIDs(sctx, newTableInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
err = h.insertStats4PhysicalID(ctx, sctx, newTableInfo, id)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old table stats.
|
||||
droppedIDs, err2 := getPhysicalIDs(sctx, droppedTableInfo)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
for _, id := range droppedIDs {
|
||||
err2 = h.delayedDeleteStats4PhysicalID(ctx, sctx, id)
|
||||
if err2 != nil {
|
||||
return errors.Trace(err2)
|
||||
}
|
||||
}
|
||||
case model.ActionDropTable:
|
||||
droppedTableInfo := change.GetDropTableInfo()
|
||||
ids, err := getPhysicalIDs(sctx, droppedTableInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
err = h.delayedDeleteStats4PhysicalID(ctx, sctx, id)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
case model.ActionAddColumn:
|
||||
// TODO: implement me
|
||||
case model.ActionModifyColumn:
|
||||
// TODO: implement me
|
||||
case model.ActionAddTablePartition:
|
||||
// TODO: implement me
|
||||
case model.ActionTruncateTablePartition:
|
||||
// TODO: implement me
|
||||
case model.ActionDropTablePartition:
|
||||
// TODO: implement me
|
||||
case model.ActionExchangeTablePartition:
|
||||
// TODO: implement me
|
||||
case model.ActionReorganizePartition:
|
||||
// TODO: implement me
|
||||
case model.ActionAlterTablePartitioning:
|
||||
// TODO: implement me
|
||||
case model.ActionRemovePartitioning:
|
||||
// TODO: implement me
|
||||
case model.ActionFlashbackCluster:
|
||||
// TODO: implement me
|
||||
default:
|
||||
intest.Assert(false)
|
||||
logutil.StatsLogger().Error("Unhandled schema change event",
|
||||
zap.Stringer("type", change))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h handler) insertStats4PhysicalID(
|
||||
ctx context.Context,
|
||||
sctx sessionctx.Context,
|
||||
info *model.TableInfo,
|
||||
id int64,
|
||||
) error {
|
||||
startTS, err := storage.InsertTableStats2KV(ctx, sctx, info, id)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS))
|
||||
}
|
||||
|
||||
func (h handler) recordHistoricalStatsMeta(
|
||||
ctx context.Context,
|
||||
sctx sessionctx.Context,
|
||||
id int64,
|
||||
startTS uint64,
|
||||
) error {
|
||||
if startTS == 0 {
|
||||
return nil
|
||||
}
|
||||
enableHistoricalStats, err2 := getEnableHistoricalStats(sctx)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
if !enableHistoricalStats {
|
||||
return nil
|
||||
}
|
||||
|
||||
tbl, ok := h.statsCache.Get(id)
|
||||
if !ok || !tbl.IsInitialized() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return history.RecordHistoricalStatsMeta(
|
||||
ctx,
|
||||
sctx,
|
||||
id,
|
||||
startTS,
|
||||
util.StatsMetaHistorySourceSchemaChange,
|
||||
)
|
||||
}
|
||||
|
||||
func (h handler) delayedDeleteStats4PhysicalID(
|
||||
ctx context.Context,
|
||||
sctx sessionctx.Context,
|
||||
id int64,
|
||||
) error {
|
||||
startTS, err2 := storage.UpdateStatsMetaVersionForGC(ctx, sctx, id)
|
||||
if err2 != nil {
|
||||
return errors.Trace(err2)
|
||||
}
|
||||
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS))
|
||||
}
|
||||
|
||||
func getPhysicalIDs(
|
||||
sctx sessionctx.Context,
|
||||
tblInfo *model.TableInfo,
|
||||
) (ids []int64, err error) {
|
||||
pi := tblInfo.GetPartitionInfo()
|
||||
if pi == nil {
|
||||
return []int64{tblInfo.ID}, nil
|
||||
}
|
||||
ids = make([]int64, 0, len(pi.Definitions)+1)
|
||||
for _, def := range pi.Definitions {
|
||||
ids = append(ids, def.ID)
|
||||
}
|
||||
pruneMode, err := getCurrentPruneMode(sctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if pruneMode == variable.Dynamic {
|
||||
ids = append(ids, tblInfo.ID)
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func getCurrentPruneMode(
|
||||
sctx sessionctx.Context,
|
||||
) (variable.PartitionPruneMode, error) {
|
||||
pruneMode, err := sctx.GetSessionVars().
|
||||
GlobalVarsAccessor.
|
||||
GetGlobalSysVar(variable.TiDBPartitionPruneMode)
|
||||
return variable.PartitionPruneMode(pruneMode), errors.Trace(err)
|
||||
}
|
||||
|
||||
func getEnableHistoricalStats(
|
||||
sctx sessionctx.Context,
|
||||
) (bool, error) {
|
||||
val, err := sctx.GetSessionVars().
|
||||
GlobalVarsAccessor.
|
||||
GetGlobalSysVar(variable.TiDBEnableHistoricalStats)
|
||||
return variable.TiDBOptOn(val), errors.Trace(err)
|
||||
}
|
||||
@ -15,6 +15,7 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
@ -80,7 +81,10 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin
|
||||
}
|
||||
}
|
||||
err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
|
||||
return RecordHistoricalStatsMeta(sctx, tableID, version, source)
|
||||
if !sctx.GetSessionVars().EnableHistoricalStats {
|
||||
return nil
|
||||
}
|
||||
return RecordHistoricalStatsMeta(util.StatsCtx, sctx, tableID, version, source)
|
||||
}, util.FlagWrapTxn)
|
||||
if err != nil { // just log the error, hide the error from the outside caller.
|
||||
logutil.BgLogger().Error("record historical stats meta failed",
|
||||
@ -101,14 +105,23 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error
|
||||
}
|
||||
|
||||
// RecordHistoricalStatsMeta records the historical stats meta.
|
||||
func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version uint64, source string) error {
|
||||
func RecordHistoricalStatsMeta(
|
||||
ctx context.Context,
|
||||
sctx sessionctx.Context,
|
||||
tableID int64,
|
||||
version uint64,
|
||||
source string,
|
||||
) error {
|
||||
if tableID == 0 || version == 0 {
|
||||
return errors.Errorf("tableID %d, version %d are invalid", tableID, version)
|
||||
}
|
||||
if !sctx.GetSessionVars().EnableHistoricalStats {
|
||||
return nil
|
||||
}
|
||||
rows, _, err := util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version)
|
||||
rows, _, err := util.ExecRowsWithCtx(
|
||||
ctx,
|
||||
sctx,
|
||||
"select modify_count, count from mysql.stats_meta where table_id = %? and version = %?",
|
||||
tableID,
|
||||
version,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -118,7 +131,16 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u
|
||||
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)
|
||||
|
||||
const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())"
|
||||
if _, err := util.Exec(sctx, sql, tableID, modifyCount, count, version, source); err != nil {
|
||||
if _, err := util.ExecWithCtx(
|
||||
ctx,
|
||||
sctx,
|
||||
sql,
|
||||
tableID,
|
||||
modifyCount,
|
||||
count,
|
||||
version,
|
||||
source,
|
||||
); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
cache.TableRowStatsCache.Invalidate(tableID)
|
||||
|
||||
Reference in New Issue
Block a user