ddl, stat: switch to new struct for remove partition, flash back and remove DDLEvent (#55973)

ref pingcap/tidb#55723
This commit is contained in:
fzzf678
2024-09-11 13:43:58 +08:00
committed by GitHub
parent 28b7e3f00e
commit 2ea3542218
19 changed files with 199 additions and 386 deletions

View File

@ -129,7 +129,6 @@ go_library(
"//pkg/sessiontxn",
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/statistics/handle/util",
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/table",

View File

@ -42,7 +42,6 @@ import (
field_types "github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
@ -133,9 +132,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, e
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
addColumnEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewAddColumnEvent(tblInfo, []*model.ColumnInfo{columnInfo}),
}
addColumnEvent := util.NewAddColumnEvent(tblInfo, []*model.ColumnInfo{columnInfo})
asyncNotifyEvent(jobCtx, addColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)

View File

@ -30,6 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
@ -38,7 +39,6 @@ import (
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/filter"
@ -821,7 +821,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
case model.StateWriteReorganization:
// TODO: Support flashback in unistore.
if inFlashbackTest {
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
asyncNotifyEvent(jobCtx, util.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
@ -844,7 +844,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
}
}
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
asyncNotifyEvent(jobCtx, util.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return updateSchemaVersion(jobCtx, t, job)

View File

@ -41,7 +41,6 @@ import (
field_types "github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
@ -182,9 +181,7 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
createTableEvent := util.NewCreateTableEvent(tbInfo)
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, errors.Trace(err)
}
@ -213,9 +210,7 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
createTableEvent := util.NewCreateTableEvent(tbInfo)
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, nil
default:
@ -270,9 +265,7 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er
job.SchemaState = model.StatePublic
job.BinlogInfo.SetTableInfos(ver, args)
for i := range args {
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(args[i]),
}
createTableEvent := util.NewCreateTableEvent(args[i])
asyncNotifyEvent(jobCtx, createTableEvent, job)
}

View File

@ -57,7 +57,6 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
@ -313,7 +312,7 @@ type ddlCtx struct {
schemaVerSyncer schemaver.Syncer
serverStateSyncer serverstate.Syncer
ddlEventCh chan<- *statsutil.DDLEvent
ddlEventCh chan<- *util.SchemaChangeEvent
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoCache *infoschema.InfoCache
@ -555,7 +554,7 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) {
// asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may
// give up notify and log it.
func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent, job *model.Job) {
func asyncNotifyEvent(jobCtx *jobContext, e *util.SchemaChangeEvent, job *model.Job) {
// skip notify for system databases, system databases are expected to change at
// bootstrap and other nodes can also handle the changing in its bootstrap rather
// than be notified.

View File

@ -42,7 +42,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
@ -531,9 +530,7 @@ func (w *worker) doModifyColumnTypeWithData(
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
// Refactor the job args to add the old index ids into delete range table.
job.Args = []any{rmIdxIDs, getPartitionIDs(tblInfo)}
modifyColumnEvent := &statsutil.DDLEvent{
SchemaChangeEvent: ddlutil.NewModifyColumnEvent(tblInfo, []*model.ColumnInfo{changingCol}),
}
modifyColumnEvent := ddlutil.NewModifyColumnEvent(tblInfo, []*model.ColumnInfo{changingCol})
asyncNotifyEvent(jobCtx, modifyColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State)

View File

@ -50,7 +50,6 @@ import (
field_types "github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
@ -230,9 +229,7 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, t *meta.Meta, job *mode
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
addPartitionEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewAddPartitionEvent(tblInfo, partInfo),
}
addPartitionEvent := util.NewAddPartitionEvent(tblInfo, partInfo)
asyncNotifyEvent(jobCtx, addPartitionEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState)
@ -2332,12 +2329,10 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, t *meta.Meta, job *mod
}
job.SchemaState = model.StateNone
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
dropPartitionEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewDropPartitionEvent(
tblInfo,
&model.PartitionInfo{Definitions: droppedDefs},
),
}
dropPartitionEvent := util.NewDropPartitionEvent(
tblInfo,
&model.PartitionInfo{Definitions: droppedDefs},
)
asyncNotifyEvent(jobCtx, dropPartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{physicalTableIDs}
@ -2425,13 +2420,11 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
truncatePartitionEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewTruncatePartitionEvent(
tblInfo,
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
),
}
truncatePartitionEvent := util.NewTruncatePartitionEvent(
tblInfo,
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}
@ -2565,13 +2558,11 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
truncatePartitionEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewTruncatePartitionEvent(
tblInfo,
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
),
}
truncatePartitionEvent := util.NewTruncatePartitionEvent(
tblInfo,
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}
@ -2939,13 +2930,11 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job
}
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, pt)
exchangePartitionEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewExchangePartitionEvent(
pt,
&model.PartitionInfo{Definitions: []model.PartitionDefinition{originalPartitionDef}},
originalNt,
),
}
exchangePartitionEvent := util.NewExchangePartitionEvent(
pt,
&model.PartitionInfo{Definitions: []model.PartitionDefinition{originalPartitionDef}},
originalNt,
)
asyncNotifyEvent(jobCtx, exchangePartitionEvent, job)
return ver, nil
}
@ -3477,10 +3466,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo
// Should it actually be synchronous?
// Include the old table ID, if changed, which may contain global statistics,
// so it can be reused for the new (non)partitioned table.
event, err := newStatsDDLEventForJob(
job.SchemaID,
job.Type, oldTblID, tblInfo, statisticsPartInfo, droppedPartInfo,
)
event, err := newStatsDDLEventForJob(job.Type, oldTblID, tblInfo, statisticsPartInfo, droppedPartInfo)
if err != nil {
return ver, errors.Trace(err)
}
@ -3495,37 +3481,31 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo
return ver, errors.Trace(err)
}
// newStatsDDLEventForJob creates a statsutil.DDLEvent for a job.
// newStatsDDLEventForJob creates a util.SchemaChangeEvent for a job.
// It is used for reorganize partition, add partitioning and remove partitioning.
func newStatsDDLEventForJob(
schemaID int64,
jobType model.ActionType,
oldTblID int64,
tblInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
droppedPartInfo *model.PartitionInfo,
) (*statsutil.DDLEvent, error) {
var event *statsutil.DDLEvent
) (*util.SchemaChangeEvent, error) {
var event *util.SchemaChangeEvent
switch jobType {
case model.ActionReorganizePartition:
event = &statsutil.DDLEvent{
SchemaChangeEvent: util.NewReorganizePartitionEvent(
tblInfo,
addedPartInfo,
droppedPartInfo,
),
}
event = util.NewReorganizePartitionEvent(
tblInfo,
addedPartInfo,
droppedPartInfo,
)
case model.ActionAlterTablePartitioning:
event = &statsutil.DDLEvent{
SchemaChangeEvent: util.NewAddPartitioningEvent(
oldTblID,
tblInfo,
addedPartInfo,
),
}
event = util.NewAddPartitioningEvent(
oldTblID,
tblInfo,
addedPartInfo,
)
case model.ActionRemovePartitioning:
event = statsutil.NewRemovePartitioningEvent(
schemaID,
event = util.NewRemovePartitioningEvent(
oldTblID,
tblInfo,
droppedPartInfo,

View File

@ -39,7 +39,6 @@ import (
pmodel "github.com/pingcap/tidb/pkg/parser/model"
field_types "github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
@ -124,9 +123,7 @@ func onDropTableOrView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver in
startKey := tablecodec.EncodeTablePrefix(job.TableID)
job.Args = append(job.Args, startKey, oldIDs, ruleIDs)
if !tblInfo.IsSequence() && !tblInfo.IsView() {
dropTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewDropTableEvent(tblInfo),
}
dropTableEvent := util.NewDropTableEvent(tblInfo)
asyncNotifyEvent(jobCtx, dropTableEvent, job)
}
default:
@ -573,9 +570,7 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
truncateTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewTruncateTableEvent(tblInfo, oldTblInfo),
}
truncateTableEvent := util.NewTruncateTableEvent(tblInfo, oldTblInfo)
asyncNotifyEvent(jobCtx, truncateTableEvent, job)
// see truncateTableByReassignPartitionIDs for why they might change.
args.OldPartitionIDs = oldPartitionIDs

View File

@ -44,6 +44,8 @@ go_test(
embed = [":util"],
flaky = True,
deps = [
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",

View File

@ -27,15 +27,14 @@ import (
// check the GetType of SchemaChange and call the corresponding getter function
// to retrieve the needed information.
type SchemaChangeEvent struct {
// todo: field and method will be added in the next few pr on demand
tableInfo *model.TableInfo
oldTableInfo *model.TableInfo
addedPartInfo *model.PartitionInfo
droppedPartInfo *model.PartitionInfo
columnInfos []*model.ColumnInfo
// nonPartTableID is used to store the non-partitioned table that is converted to
// a partitioned table in NewAddPartitioningEvent.
nonPartTableID int64
// oldTableID4Partition is used to store the table ID when a table transitions from being partitioned to non-partitioned,
// or vice versa.
oldTableID4Partition int64
tp model.ActionType
}
@ -54,17 +53,23 @@ func (s *SchemaChangeEvent) String() string {
if s.oldTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.oldTableInfo.ID, s.oldTableInfo.Name)
}
if s.nonPartTableID != 0 {
_, _ = fmt.Fprintf(&sb, ", Old Table ID for Partition: %d", s.nonPartTableID)
if s.oldTableID4Partition != 0 {
_, _ = fmt.Fprintf(&sb, ", Old Table ID for Partition: %d", s.oldTableID4Partition)
}
if s.addedPartInfo != nil {
for _, partDef := range s.addedPartInfo.Definitions {
_, _ = fmt.Fprintf(&sb, ", Partition Name: %s, Partition ID: %d", partDef.Name, partDef.ID)
if partDef.Name.L != "" {
_, _ = fmt.Fprintf(&sb, ", Partition Name: %s", partDef.Name)
}
_, _ = fmt.Fprintf(&sb, ", Partition ID: %d", partDef.ID)
}
}
if s.droppedPartInfo != nil {
for _, partDef := range s.droppedPartInfo.Definitions {
_, _ = fmt.Fprintf(&sb, ", Dropped Partition Name: %s, Dropped Partition ID: %d", partDef.Name, partDef.ID)
if partDef.Name.L != "" {
_, _ = fmt.Fprintf(&sb, ", Dropped Partition Name: %s", partDef.Name)
}
_, _ = fmt.Fprintf(&sb, ", Dropped Partition ID: %d", partDef.ID)
}
}
for _, columnInfo := range s.columnInfos {
@ -323,10 +328,10 @@ func NewAddPartitioningEvent(
addedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionAlterTablePartitioning,
nonPartTableID: nonPartTableID,
tableInfo: newGlobalTableInfo,
addedPartInfo: addedPartInfo,
tp: model.ActionAlterTablePartitioning,
oldTableID4Partition: nonPartTableID,
tableInfo: newGlobalTableInfo,
addedPartInfo: addedPartInfo,
}
}
@ -339,5 +344,39 @@ func (s *SchemaChangeEvent) GetAddPartitioningInfo() (
addedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionAlterTablePartitioning)
return s.nonPartTableID, s.tableInfo, s.addedPartInfo
return s.oldTableID4Partition, s.tableInfo, s.addedPartInfo
}
// NewRemovePartitioningEvent creates a schema change event whose type is
// ActionRemovePartitioning.
func NewRemovePartitioningEvent(
oldPartitionedTableID int64,
nonPartitionTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionRemovePartitioning,
oldTableID4Partition: oldPartitionedTableID,
tableInfo: nonPartitionTableInfo,
droppedPartInfo: droppedPartInfo,
}
}
// GetRemovePartitioningInfo returns the table info and partition info of the SchemaChangeEvent whose type is
// ActionRemovePartitioning.
func (s *SchemaChangeEvent) GetRemovePartitioningInfo() (
oldPartitionedTableID int64,
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionRemovePartitioning)
return s.oldTableID4Partition, s.tableInfo, s.droppedPartInfo
}
// NewFlashbackClusterEvent creates a schema change event whose type is
// ActionFlashbackCluster.
func NewFlashbackClusterEvent() *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionFlashbackCluster,
}
}

View File

@ -19,6 +19,8 @@ import (
"path/filepath"
"testing"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/stretchr/testify/require"
)
@ -32,3 +34,43 @@ func TestFolderNotEmpty(t *testing.T) {
require.NoError(t, f.Close())
require.True(t, FolderNotEmpty(tmp))
}
func TestEventString(t *testing.T) {
// Create an Event object
e := &SchemaChangeEvent{
tp: model.ActionAddColumn,
tableInfo: &model.TableInfo{
ID: 1,
Name: pmodel.NewCIStr("Table1"),
},
addedPartInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 2},
{ID: 3},
},
},
oldTableInfo: &model.TableInfo{
ID: 4,
Name: pmodel.NewCIStr("Table2"),
},
droppedPartInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 5},
{ID: 6},
},
},
columnInfos: []*model.ColumnInfo{
{ID: 7, Name: pmodel.NewCIStr("Column1")},
{ID: 8, Name: pmodel.NewCIStr("Column2")},
},
}
// Call the String method
result := e.String()
// Check the result
expected := "(Event Type: add column, Table ID: 1, Table Name: Table1, Old Table ID: 4, Old Table Name: Table2," +
" Partition ID: 2, Partition ID: 3, Dropped Partition ID: 5, Dropped Partition ID: 6, " +
"Column ID: 7, Column Name: Column1, Column ID: 8, Column Name: Column2)"
require.Equal(t, expected, result)
}

View File

@ -22,6 +22,7 @@ go_library(
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
@ -34,10 +35,10 @@ go_test(
flaky = True,
shard_count = 18,
deps = [
"//pkg/ddl/util",
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/planner/cardinality",
"//pkg/statistics/handle/util",
"//pkg/testkit",
"//pkg/types",
"//pkg/util/mock",

View File

@ -16,6 +16,7 @@ package ddl
import (
"github.com/pingcap/errors"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
@ -24,11 +25,12 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)
type ddlHandlerImpl struct {
ddlEventCh chan *util.DDLEvent
ddlEventCh chan *ddlutil.SchemaChangeEvent
statsWriter types.StatsReadWriter
statsHandler types.StatsHandle
globalStatsHandler types.StatsGlobal
@ -41,7 +43,7 @@ func NewDDLHandler(
globalStatsHandler types.StatsGlobal,
) types.DDL {
return &ddlHandlerImpl{
ddlEventCh: make(chan *util.DDLEvent, 1000),
ddlEventCh: make(chan *ddlutil.SchemaChangeEvent, 1000),
statsWriter: statsWriter,
statsHandler: statsHandler,
globalStatsHandler: globalStatsHandler,
@ -49,65 +51,16 @@ func NewDDLHandler(
}
// HandleDDLEvent begins to process a ddl task.
func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
func (h *ddlHandlerImpl) HandleDDLEvent(s *ddlutil.SchemaChangeEvent) error {
sctx, err := h.statsHandler.SPool().Get()
if err != nil {
return err
}
defer h.statsHandler.SPool().Put(sctx)
// ActionFlashbackCluster will not create any new stats info
// and it's SchemaID alwayws equals to 0, so skip check it.
if t.GetType() != model.ActionFlashbackCluster && t.SchemaChangeEvent == nil {
if isSysDB, err := t.IsMemOrSysDB(sctx.(sessionctx.Context)); err != nil {
return err
} else if isSysDB {
// EXCHANGE PARTITION EVENT NOTES:
// 1. When a partition is exchanged with a system table, we need to adjust the global statistics
// based on the count delta and modify count delta. However, due to the involvement of the system table,
// a complete update of the global statistics is not feasible. Therefore, we bypass the statistics update
// for the table in this scenario. Despite this, the table id still changes, so the statistics for the
// system table will still be visible.
// 2. If the system table is a partitioned table, we will update the global statistics for the partitioned table.
// It is rare to exchange a partition from a system table, so we can ignore this case. In this case,
// the system table will have statistics, but this is not a significant issue.
logutil.StatsLogger().Info("Skip handle system database ddl event", zap.Stringer("event", t))
return nil
}
}
if t.SchemaChangeEvent == nil {
// when SchemaChangeEvent is set, it will be printed in the default branch of
// below switch.
logutil.StatsLogger().Info("Handle ddl event", zap.Stringer("event", t))
}
switch t.GetType() {
case model.ActionRemovePartitioning:
// Change id for global stats, since the data has not changed!
// Note: This operation will update all tables related to statistics with the new ID.
oldTblID,
newSingleTableInfo,
droppedPartInfo := t.GetRemovePartitioningInfo()
if err := h.statsWriter.ChangeGlobalStatsID(oldTblID, newSingleTableInfo.ID); err != nil {
return err
}
// Remove partition stats.
for _, def := range droppedPartInfo.Definitions {
if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil {
return err
}
}
case model.ActionFlashbackCluster:
return h.statsWriter.UpdateStatsVersion()
default:
logutil.StatsLogger().Info("Handle schema change event", zap.Stringer("event", t.SchemaChangeEvent))
}
e := t.SchemaChangeEvent
switch e.GetType() {
switch s.GetType() {
case model.ActionCreateTable:
newTableInfo := e.GetCreateTableInfo()
newTableInfo := s.GetCreateTableInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
@ -118,7 +71,7 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
}
}
case model.ActionTruncateTable:
newTableInfo, droppedTableInfo := e.GetTruncateTableInfo()
newTableInfo, droppedTableInfo := s.GetTruncateTableInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
@ -140,7 +93,7 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
}
}
case model.ActionDropTable:
droppedTableInfo := e.GetDropTableInfo()
droppedTableInfo := s.GetDropTableInfo()
ids, err := h.getTableIDs(droppedTableInfo)
if err != nil {
return err
@ -151,7 +104,7 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
}
}
case model.ActionAddColumn:
newTableInfo, newColumnInfo := e.GetAddColumnInfo()
newTableInfo, newColumnInfo := s.GetAddColumnInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
@ -162,7 +115,7 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
}
}
case model.ActionModifyColumn:
newTableInfo, modifiedColumnInfo := e.GetModifyColumnInfo()
newTableInfo, modifiedColumnInfo := s.GetModifyColumnInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
@ -173,30 +126,40 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
}
}
case model.ActionAddTablePartition:
globalTableInfo, addedPartitionInfo := e.GetAddPartitionInfo()
globalTableInfo, addedPartitionInfo := s.GetAddPartitionInfo()
for _, def := range addedPartitionInfo.Definitions {
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
return err
}
}
case model.ActionTruncateTablePartition:
if err := h.onTruncatePartitions(e); err != nil {
if err := h.onTruncatePartitions(s); err != nil {
return err
}
case model.ActionDropTablePartition:
if err := h.onDropPartitions(e); err != nil {
if err := h.onDropPartitions(s); err != nil {
return err
}
// EXCHANGE PARTITION EVENT NOTES:
// 1. When a partition is exchanged with a system table, we need to adjust the global statistics
// based on the count delta and modify count delta. However, due to the involvement of the system table,
// a complete update of the global statistics is not feasible. Therefore, we bypass the statistics update
// for the table in this scenario. Despite this, the table id still changes, so the statistics for the
// system table will still be visible.
// 2. If the system table is a partitioned table, we will update the global statistics for the partitioned table.
// It is rare to exchange a partition from a system table, so we can ignore this case. In this case,
// the system table will have statistics, but this is not a significant issue.
// So we decided to completely ignore the system table event.
case model.ActionExchangeTablePartition:
if err := h.onExchangeAPartition(e); err != nil {
if err := h.onExchangeAPartition(s); err != nil {
return err
}
case model.ActionReorganizePartition:
if err := h.onReorganizePartitions(e); err != nil {
if err := h.onReorganizePartitions(s); err != nil {
return err
}
case model.ActionAlterTablePartitioning:
oldSingleTableID, globalTableInfo, addedPartInfo := e.GetAddPartitioningInfo()
oldSingleTableID, globalTableInfo, addedPartInfo := s.GetAddPartitioningInfo()
// Add new partition stats.
for _, def := range addedPartInfo.Definitions {
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
@ -206,6 +169,25 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
// Change id for global stats, since the data has not changed!
// Note: This operation will update all tables related to statistics with the new ID.
return h.statsWriter.ChangeGlobalStatsID(oldSingleTableID, globalTableInfo.ID)
case model.ActionRemovePartitioning:
// Change id for global stats, since the data has not changed!
// Note: This operation will update all tables related to statistics with the new ID.
oldTblID, newSingleTableInfo, droppedPartInfo := s.GetRemovePartitioningInfo()
if err := h.statsWriter.ChangeGlobalStatsID(oldTblID, newSingleTableInfo.ID); err != nil {
return err
}
// Remove partition stats.
for _, def := range droppedPartInfo.Definitions {
if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil {
return err
}
}
case model.ActionFlashbackCluster:
return h.statsWriter.UpdateStatsVersion()
default:
intest.Assert(false)
logutil.StatsLogger().Error("Unhandled schema change event", zap.Stringer("type", s))
}
return nil
}
@ -303,6 +285,6 @@ func (h *ddlHandlerImpl) getTableIDs(tblInfo *model.TableInfo) (ids []int64, err
}
// DDLEventCh returns ddl events channel in handle.
func (h *ddlHandlerImpl) DDLEventCh() chan *util.DDLEvent {
func (h *ddlHandlerImpl) DDLEventCh() chan *ddlutil.SchemaChangeEvent {
return h.ddlEventCh
}

View File

@ -19,10 +19,10 @@ import (
"fmt"
"testing"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/mock"
@ -1302,13 +1302,10 @@ func TestAddPartitioning(t *testing.T) {
)
}
func findEvent(eventCh <-chan *util.DDLEvent, eventType model.ActionType) *util.DDLEvent {
func findEvent(eventCh <-chan *ddlutil.SchemaChangeEvent, eventType model.ActionType) *ddlutil.SchemaChangeEvent {
// Find the target event.
for {
event := <-eventCh
if event.SchemaChangeEvent.GetType() == eventType {
return event
}
if event.GetType() == eventType {
return event
}

View File

@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/types",
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl/util",
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/parser/ast",

View File

@ -19,6 +19,7 @@ import (
"sync"
"time"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
@ -458,9 +459,9 @@ type StatsGlobal interface {
// DDL is used to handle ddl events.
type DDL interface {
// HandleDDLEvent handles ddl events.
HandleDDLEvent(event *statsutil.DDLEvent) error
HandleDDLEvent(changeEvent *ddlutil.SchemaChangeEvent) error
// DDLEventCh returns ddl events channel in handle.
DDLEventCh() chan *statsutil.DDLEvent
DDLEventCh() chan *ddlutil.SchemaChangeEvent
}
// StatsHandle is used to manage TiDB Statistics.

View File

@ -1,10 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "util",
srcs = [
"auto_analyze_proc_id_generator.go",
"ddl_event.go",
"lease_getter.go",
"pool.go",
"table_info.go",
@ -13,10 +12,8 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/util",
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl/util",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/parser/terror",
"//pkg/planner/core/resolve",
"//pkg/sessionctx",
@ -36,16 +33,3 @@ go_library(
"@org_uber_go_atomic//:atomic",
],
)
go_test(
name = "util_test",
timeout = "short",
srcs = ["ddl_event_test.go"],
embed = [":util"],
flaky = True,
deps = [
"//pkg/meta/model",
"//pkg/parser/model",
"@com_github_stretchr_testify//require",
],
)

View File

@ -1,131 +0,0 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"fmt"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
)
// DDLEvent contains the information of a ddl event that is used to update stats.
type DDLEvent struct {
// todo: replace DDLEvent by SchemaChangeEvent gradually
SchemaChangeEvent *ddlutil.SchemaChangeEvent
// For different ddl types, the following fields are used.
// They have different meanings for different ddl types.
// Please do **not** use these fields directly, use the corresponding
// NewXXXEvent functions instead.
tableInfo *model.TableInfo
partInfo *model.PartitionInfo
oldTableInfo *model.TableInfo
oldPartInfo *model.PartitionInfo
columnInfos []*model.ColumnInfo
// schemaID is the ID of the schema that the table belongs to.
// Used to filter out the system or memory tables.
schemaID int64
// This value is used to store the table ID during a transition.
// It applies when a table structure is being changed from partitioned to non-partitioned, or vice versa.
oldTableID int64
tp model.ActionType
}
// IsMemOrSysDB checks whether the table is in the memory or system database.
func (e *DDLEvent) IsMemOrSysDB(sctx sessionctx.Context) (bool, error) {
intest.Assert(e.schemaID != 0, "schemaID should not be 0, please set it when creating the event")
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
schema, ok := is.SchemaByID(e.schemaID)
if !ok {
return false, fmt.Errorf("schema not found for table %s", e.tableInfo.Name)
}
return util.IsMemOrSysDB(schema.Name.L), nil
}
// NewRemovePartitioningEvent creates a new ddl event that converts a partitioned table to a single table.
// For example, `alter table t remove partitioning`.
func NewRemovePartitioningEvent(
schemaID int64,
oldPartitionedTableID int64,
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) *DDLEvent {
return &DDLEvent{
tp: model.ActionRemovePartitioning,
schemaID: schemaID,
oldTableID: oldPartitionedTableID,
tableInfo: newSingleTableInfo,
oldPartInfo: droppedPartInfo,
}
}
// GetRemovePartitioningInfo gets the table info of the table that is converted to a single table.
func (e *DDLEvent) GetRemovePartitioningInfo() (
oldPartitionedTableID int64,
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) {
return e.oldTableID, e.tableInfo, e.oldPartInfo
}
// NewFlashbackClusterEvent creates a new ddl event that flashes back the cluster.
func NewFlashbackClusterEvent() *DDLEvent {
return &DDLEvent{
tp: model.ActionFlashbackCluster,
}
}
// GetType returns the type of the ddl event.
func (e *DDLEvent) GetType() model.ActionType {
return e.tp
}
// String implements fmt.Stringer interface.
func (e *DDLEvent) String() string {
ret := fmt.Sprintf("(Event Type: %s", e.tp)
if e.schemaID != 0 {
ret += fmt.Sprintf(", Schema ID: %d", e.schemaID)
}
if e.tableInfo != nil {
ret += fmt.Sprintf(", Table ID: %d, Table Name: %s", e.tableInfo.ID, e.tableInfo.Name)
}
if e.partInfo != nil {
ids := make([]int64, 0, len(e.partInfo.Definitions))
for _, def := range e.partInfo.Definitions {
ids = append(ids, def.ID)
}
ret += fmt.Sprintf(", Partition IDs: %v", ids)
}
if e.oldTableInfo != nil {
ret += fmt.Sprintf(", Old Table ID: %d, Old Table Name: %s", e.oldTableInfo.ID, e.oldTableInfo.Name)
}
if e.oldPartInfo != nil {
ids := make([]int64, 0, len(e.oldPartInfo.Definitions))
for _, def := range e.oldPartInfo.Definitions {
ids = append(ids, def.ID)
}
ret += fmt.Sprintf(", Old Partition IDs: %v", ids)
}
for _, columnInfo := range e.columnInfos {
ret += fmt.Sprintf(", Column ID: %d, Column Name: %s", columnInfo.ID, columnInfo.Name)
}
return ret
}

View File

@ -1,65 +0,0 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"testing"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/stretchr/testify/require"
)
func TestEventString(t *testing.T) {
// Create an Event object
e := &DDLEvent{
tp: model.ActionAddColumn,
schemaID: 1,
tableInfo: &model.TableInfo{
ID: 1,
Name: pmodel.NewCIStr("Table1"),
},
partInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 2},
{ID: 3},
},
},
oldTableInfo: &model.TableInfo{
ID: 4,
Name: pmodel.NewCIStr("Table2"),
},
oldPartInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 5},
{ID: 6},
},
},
columnInfos: []*model.ColumnInfo{
{ID: 7, Name: pmodel.NewCIStr("Column1")},
{ID: 8, Name: pmodel.NewCIStr("Column2")},
},
}
// Call the String method
result := e.String()
// Check the result
expected := "(Event Type: add column, Schema ID: 1, Table ID: 1, Table Name: Table1, " +
"Partition IDs: [2 3], Old Table ID: 4, Old Table Name: Table2, " +
"Old Partition IDs: [5 6], Column ID: 7, Column Name: Column1, " +
"Column ID: 8, Column Name: Column2"
require.Equal(t, expected, result)
}