ddl_notifier: implement serialization and deserialization for SchemaChangeEvent (#56089)

ref pingcap/tidb#55722
This commit is contained in:
fzzf678
2024-09-18 15:31:40 +08:00
committed by GitHub
parent 8313bbc5c0
commit 7ce6ec3333
5 changed files with 184 additions and 120 deletions

View File

@ -3,8 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "notifier",
srcs = [
"events.go",
"publish.go",
"schema_change_notifier.go",
"store.go",
],
importpath = "github.com/pingcap/tidb/pkg/ddl/notifier",
@ -13,6 +13,7 @@ go_library(
"//pkg/ddl/session",
"//pkg/meta/model",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
],
)
@ -20,8 +21,8 @@ go_test(
name = "notifier_test",
timeout = "short",
srcs = [
"events_test.go",
"publish_testkit_test.go",
"schema_change_notifier_test.go",
],
embed = [":notifier"],
flaky = True,

View File

@ -15,6 +15,7 @@
package notifier
import (
"encoding/json"
"fmt"
"strings"
@ -27,16 +28,7 @@ import (
// check the GetType of SchemaChange and call the corresponding getter function
// to retrieve the needed information.
type SchemaChangeEvent struct {
tableInfo *model.TableInfo
oldTableInfo *model.TableInfo
addedPartInfo *model.PartitionInfo
droppedPartInfo *model.PartitionInfo
columnInfos []*model.ColumnInfo
// oldTableID4Partition is used to store the table ID when a table transitions from being partitioned to non-partitioned,
// or vice versa.
oldTableID4Partition int64
tp model.ActionType
inner *jsonSchemaChangeEvent
}
// String implements fmt.Stringer interface.
@ -46,33 +38,33 @@ func (s *SchemaChangeEvent) String() string {
}
var sb strings.Builder
_, _ = fmt.Fprintf(&sb, "(Event Type: %s", s.tp)
if s.tableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.tableInfo.ID, s.tableInfo.Name)
_, _ = fmt.Fprintf(&sb, "(Event Type: %s", s.inner.Tp)
if s.inner.TableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.inner.TableInfo.ID, s.inner.TableInfo.Name)
}
if s.oldTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.oldTableInfo.ID, s.oldTableInfo.Name)
if s.inner.OldTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.inner.OldTableInfo.ID, s.inner.OldTableInfo.Name)
}
if s.oldTableID4Partition != 0 {
_, _ = fmt.Fprintf(&sb, ", Old Table ID for Partition: %d", s.oldTableID4Partition)
if s.inner.OldTableID4Partition != 0 {
_, _ = fmt.Fprintf(&sb, ", Old Table ID for Partition: %d", s.inner.OldTableID4Partition)
}
if s.addedPartInfo != nil {
for _, partDef := range s.addedPartInfo.Definitions {
if s.inner.AddedPartInfo != nil {
for _, partDef := range s.inner.AddedPartInfo.Definitions {
if partDef.Name.L != "" {
_, _ = fmt.Fprintf(&sb, ", Partition Name: %s", partDef.Name)
}
_, _ = fmt.Fprintf(&sb, ", Partition ID: %d", partDef.ID)
}
}
if s.droppedPartInfo != nil {
for _, partDef := range s.droppedPartInfo.Definitions {
if s.inner.DroppedPartInfo != nil {
for _, partDef := range s.inner.DroppedPartInfo.Definitions {
if partDef.Name.L != "" {
_, _ = fmt.Fprintf(&sb, ", Dropped Partition Name: %s", partDef.Name)
}
_, _ = fmt.Fprintf(&sb, ", Dropped Partition ID: %d", partDef.ID)
}
}
for _, columnInfo := range s.columnInfos {
for _, columnInfo := range s.inner.ColumnInfos {
_, _ = fmt.Fprintf(&sb, ", Column ID: %d, Column Name: %s", columnInfo.ID, columnInfo.Name)
}
sb.WriteString(")")
@ -85,7 +77,7 @@ func (s *SchemaChangeEvent) GetType() model.ActionType {
if s == nil {
return model.ActionNone
}
return s.tp
return s.inner.Tp
}
// NewCreateTableEvent creates a SchemaChangeEvent whose type is
@ -94,16 +86,18 @@ func NewCreateTableEvent(
newTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionCreateTable,
tableInfo: newTableInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionCreateTable,
TableInfo: newTableInfo,
},
}
}
// GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type
// is ActionCreateTable.
func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo {
intest.Assert(s.tp == model.ActionCreateTable)
return s.tableInfo
intest.Assert(s.inner.Tp == model.ActionCreateTable)
return s.inner.TableInfo
}
// NewTruncateTableEvent creates a SchemaChangeEvent whose type is
@ -113,9 +107,11 @@ func NewTruncateTableEvent(
droppedTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionTruncateTable,
tableInfo: newTableInfo,
oldTableInfo: droppedTableInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionTruncateTable,
TableInfo: newTableInfo,
OldTableInfo: droppedTableInfo,
},
}
}
@ -125,8 +121,8 @@ func (s *SchemaChangeEvent) GetTruncateTableInfo() (
newTableInfo *model.TableInfo,
droppedTableInfo *model.TableInfo,
) {
intest.Assert(s.tp == model.ActionTruncateTable)
return s.tableInfo, s.oldTableInfo
intest.Assert(s.inner.Tp == model.ActionTruncateTable)
return s.inner.TableInfo, s.inner.OldTableInfo
}
// NewDropTableEvent creates a SchemaChangeEvent whose type is ActionDropTable.
@ -134,16 +130,18 @@ func NewDropTableEvent(
droppedTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionDropTable,
oldTableInfo: droppedTableInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionDropTable,
OldTableInfo: droppedTableInfo,
},
}
}
// GetDropTableInfo returns the table info of the SchemaChangeEvent whose type is
// ActionDropTable.
func (s *SchemaChangeEvent) GetDropTableInfo() (droppedTableInfo *model.TableInfo) {
intest.Assert(s.tp == model.ActionDropTable)
return s.oldTableInfo
intest.Assert(s.inner.Tp == model.ActionDropTable)
return s.inner.OldTableInfo
}
// NewAddColumnEvent creates a SchemaChangeEvent whose type is ActionAddColumn.
@ -152,9 +150,11 @@ func NewAddColumnEvent(
newColumnInfos []*model.ColumnInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionAddColumn,
tableInfo: tableInfo,
columnInfos: newColumnInfos,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionAddColumn,
TableInfo: tableInfo,
ColumnInfos: newColumnInfos,
},
}
}
@ -164,8 +164,8 @@ func (s *SchemaChangeEvent) GetAddColumnInfo() (
tableInfo *model.TableInfo,
columnInfos []*model.ColumnInfo,
) {
intest.Assert(s.tp == model.ActionAddColumn)
return s.tableInfo, s.columnInfos
intest.Assert(s.inner.Tp == model.ActionAddColumn)
return s.inner.TableInfo, s.inner.ColumnInfos
}
// NewModifyColumnEvent creates a SchemaChangeEvent whose type is
@ -175,9 +175,11 @@ func NewModifyColumnEvent(
modifiedColumnInfo []*model.ColumnInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionModifyColumn,
tableInfo: tableInfo,
columnInfos: modifiedColumnInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionModifyColumn,
TableInfo: tableInfo,
ColumnInfos: modifiedColumnInfo,
},
}
}
@ -187,8 +189,8 @@ func (s *SchemaChangeEvent) GetModifyColumnInfo() (
newTableInfo *model.TableInfo,
modifiedColumnInfo []*model.ColumnInfo,
) {
intest.Assert(s.tp == model.ActionModifyColumn)
return s.tableInfo, s.columnInfos
intest.Assert(s.inner.Tp == model.ActionModifyColumn)
return s.inner.TableInfo, s.inner.ColumnInfos
}
// NewAddPartitionEvent creates a SchemaChangeEvent whose type is
@ -198,9 +200,11 @@ func NewAddPartitionEvent(
addedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionAddTablePartition,
tableInfo: globalTableInfo,
addedPartInfo: addedPartInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionAddTablePartition,
TableInfo: globalTableInfo,
AddedPartInfo: addedPartInfo,
},
}
}
@ -210,8 +214,8 @@ func (s *SchemaChangeEvent) GetAddPartitionInfo() (
globalTableInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionAddTablePartition)
return s.tableInfo, s.addedPartInfo
intest.Assert(s.inner.Tp == model.ActionAddTablePartition)
return s.inner.TableInfo, s.inner.AddedPartInfo
}
// NewTruncatePartitionEvent creates a SchemaChangeEvent whose type is
@ -222,10 +226,12 @@ func NewTruncatePartitionEvent(
droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionTruncateTablePartition,
tableInfo: globalTableInfo,
addedPartInfo: addedPartInfo,
droppedPartInfo: droppedPartInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionTruncateTablePartition,
TableInfo: globalTableInfo,
AddedPartInfo: addedPartInfo,
DroppedPartInfo: droppedPartInfo,
},
}
}
@ -237,8 +243,8 @@ func (s *SchemaChangeEvent) GetTruncatePartitionInfo() (
addedPartInfo *model.PartitionInfo,
droppedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionTruncateTablePartition)
return s.tableInfo, s.addedPartInfo, s.droppedPartInfo
intest.Assert(s.inner.Tp == model.ActionTruncateTablePartition)
return s.inner.TableInfo, s.inner.AddedPartInfo, s.inner.DroppedPartInfo
}
// NewDropPartitionEvent creates a SchemaChangeEvent whose type is
@ -248,9 +254,11 @@ func NewDropPartitionEvent(
droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionDropTablePartition,
tableInfo: globalTableInfo,
droppedPartInfo: droppedPartInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionDropTablePartition,
TableInfo: globalTableInfo,
DroppedPartInfo: droppedPartInfo,
},
}
}
@ -260,8 +268,8 @@ func (s *SchemaChangeEvent) GetDropPartitionInfo() (
globalTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionDropTablePartition)
return s.tableInfo, s.droppedPartInfo
intest.Assert(s.inner.Tp == model.ActionDropTablePartition)
return s.inner.TableInfo, s.inner.DroppedPartInfo
}
// NewExchangePartitionEvent creates a SchemaChangeEvent whose type is
@ -272,10 +280,12 @@ func NewExchangePartitionEvent(
nonPartTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionExchangeTablePartition,
tableInfo: globalTableInfo,
addedPartInfo: partInfo,
oldTableInfo: nonPartTableInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionExchangeTablePartition,
TableInfo: globalTableInfo,
AddedPartInfo: partInfo,
OldTableInfo: nonPartTableInfo,
},
}
}
@ -287,8 +297,8 @@ func (s *SchemaChangeEvent) GetExchangePartitionInfo() (
partInfo *model.PartitionInfo,
nonPartTableInfo *model.TableInfo,
) {
intest.Assert(s.tp == model.ActionExchangeTablePartition)
return s.tableInfo, s.addedPartInfo, s.oldTableInfo
intest.Assert(s.inner.Tp == model.ActionExchangeTablePartition)
return s.inner.TableInfo, s.inner.AddedPartInfo, s.inner.OldTableInfo
}
// NewReorganizePartitionEvent creates a SchemaChangeEvent whose type is
@ -299,10 +309,12 @@ func NewReorganizePartitionEvent(
droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionReorganizePartition,
tableInfo: globalTableInfo,
addedPartInfo: addedPartInfo,
droppedPartInfo: droppedPartInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionReorganizePartition,
TableInfo: globalTableInfo,
AddedPartInfo: addedPartInfo,
DroppedPartInfo: droppedPartInfo,
},
}
}
@ -314,8 +326,8 @@ func (s *SchemaChangeEvent) GetReorganizePartitionInfo() (
addedPartInfo *model.PartitionInfo,
droppedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionReorganizePartition)
return s.tableInfo, s.addedPartInfo, s.droppedPartInfo
intest.Assert(s.inner.Tp == model.ActionReorganizePartition)
return s.inner.TableInfo, s.inner.AddedPartInfo, s.inner.DroppedPartInfo
}
// NewAddPartitioningEvent creates a SchemaChangeEvent whose type is
@ -328,10 +340,12 @@ func NewAddPartitioningEvent(
addedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionAlterTablePartitioning,
oldTableID4Partition: nonPartTableID,
tableInfo: newGlobalTableInfo,
addedPartInfo: addedPartInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionAlterTablePartitioning,
OldTableID4Partition: nonPartTableID,
TableInfo: newGlobalTableInfo,
AddedPartInfo: addedPartInfo,
},
}
}
@ -343,8 +357,8 @@ func (s *SchemaChangeEvent) GetAddPartitioningInfo() (
newGlobalTableInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionAlterTablePartitioning)
return s.oldTableID4Partition, s.tableInfo, s.addedPartInfo
intest.Assert(s.inner.Tp == model.ActionAlterTablePartitioning)
return s.inner.OldTableID4Partition, s.inner.TableInfo, s.inner.AddedPartInfo
}
// NewRemovePartitioningEvent creates a schema change event whose type is
@ -355,10 +369,12 @@ func NewRemovePartitioningEvent(
droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionRemovePartitioning,
oldTableID4Partition: oldPartitionedTableID,
tableInfo: nonPartitionTableInfo,
droppedPartInfo: droppedPartInfo,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionRemovePartitioning,
OldTableID4Partition: oldPartitionedTableID,
TableInfo: nonPartitionTableInfo,
DroppedPartInfo: droppedPartInfo,
},
}
}
@ -369,14 +385,46 @@ func (s *SchemaChangeEvent) GetRemovePartitioningInfo() (
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) {
intest.Assert(s.tp == model.ActionRemovePartitioning)
return s.oldTableID4Partition, s.tableInfo, s.droppedPartInfo
intest.Assert(s.inner.Tp == model.ActionRemovePartitioning)
return s.inner.OldTableID4Partition, s.inner.TableInfo, s.inner.DroppedPartInfo
}
// NewFlashbackClusterEvent creates a schema change event whose type is
// ActionFlashbackCluster.
func NewFlashbackClusterEvent() *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionFlashbackCluster,
inner: &jsonSchemaChangeEvent{
Tp: model.ActionFlashbackCluster,
},
}
}
// jsonSchemaChangeEvent is used by SchemaChangeEvent when needed to (un)marshal data,
// we want to hide the details to subscribers, so SchemaChangeEvent contain this struct.
type jsonSchemaChangeEvent struct {
TableInfo *model.TableInfo `json:"table_info,omitempty"`
OldTableInfo *model.TableInfo `json:"old_table_info,omitempty"`
AddedPartInfo *model.PartitionInfo `json:"added_partition_info,omitempty"`
DroppedPartInfo *model.PartitionInfo `json:"dropped_partition_info,omitempty"`
ColumnInfos []*model.ColumnInfo `json:"column_infos,omitempty"`
// OldTableID4Partition is used to store the table ID when a table transitions from being partitioned to non-partitioned,
// or vice versa.
OldTableID4Partition int64 `json:"old_table_id_for_partition,omitempty"`
Tp model.ActionType `json:"type,omitempty"`
}
// MarshalJSON implements the json.Marshaler interface.
func (s *SchemaChangeEvent) MarshalJSON() ([]byte, error) {
return json.Marshal(s.inner)
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (s *SchemaChangeEvent) UnmarshalJSON(b []byte) error {
var j jsonSchemaChangeEvent
err := json.Unmarshal(b, &j)
if err != nil {
s.inner = &j
}
return err
}

View File

@ -25,30 +25,32 @@ import (
func TestEventString(t *testing.T) {
// Create an Event object
e := &SchemaChangeEvent{
tp: model.ActionAddColumn,
tableInfo: &model.TableInfo{
ID: 1,
Name: pmodel.NewCIStr("Table1"),
},
addedPartInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 2},
{ID: 3},
inner: &jsonSchemaChangeEvent{
Tp: model.ActionAddColumn,
TableInfo: &model.TableInfo{
ID: 1,
Name: pmodel.NewCIStr("Table1"),
},
},
oldTableInfo: &model.TableInfo{
ID: 4,
Name: pmodel.NewCIStr("Table2"),
},
droppedPartInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 5},
{ID: 6},
AddedPartInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 2},
{ID: 3},
},
},
OldTableInfo: &model.TableInfo{
ID: 4,
Name: pmodel.NewCIStr("Table2"),
},
DroppedPartInfo: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: 5},
{ID: 6},
},
},
ColumnInfos: []*model.ColumnInfo{
{ID: 7, Name: pmodel.NewCIStr("Column1")},
{ID: 8, Name: pmodel.NewCIStr("Column2")},
},
},
columnInfos: []*model.ColumnInfo{
{ID: 7, Name: pmodel.NewCIStr("Column1")},
{ID: 8, Name: pmodel.NewCIStr("Column2")},
},
}

View File

@ -20,6 +20,8 @@ import (
"github.com/pingcap/tidb/pkg/ddl/notifier"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)
@ -33,7 +35,7 @@ func TestPublishToTableStore(t *testing.T) {
CREATE TABLE ddl_notifier (
ddl_job_id BIGINT,
multi_schema_change_seq BIGINT COMMENT '-1 if the schema change does not belong to a multi-schema change DDL. 0 or positive numbers representing the sub-job index of a multi-schema change DDL',
schema_change JSON COMMENT 'SchemaChange at rest',
schema_change LONGBLOB COMMENT 'SchemaChangeEvent at rest',
processed_by_flag BIGINT UNSIGNED DEFAULT 0 COMMENT 'flag to mark which subscriber has processed the event',
PRIMARY KEY(ddl_job_id, multi_schema_change_seq)
)
@ -42,11 +44,12 @@ CREATE TABLE ddl_notifier (
ctx := context.Background()
s := notifier.OpenTableStore("test", "ddl_notifier")
se := sess.NewSession(tk.Session())
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, nil, s)
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
err = notifier.PubSchemeChangeToStore(ctx, se, 2, -1, nil, s)
event2 := notifier.NewDropTableEvent(&model.TableInfo{ID: 1001, Name: pmodel.NewCIStr("t2")})
err = notifier.PubSchemeChangeToStore(ctx, se, 2, -1, event2, s)
require.NoError(t, err)
got, err := s.List(ctx, se, 1)
require.NoError(t, err)
require.Len(t, got, 1)

View File

@ -16,8 +16,10 @@ package notifier
import (
"context"
"encoding/json"
"fmt"
"github.com/pingcap/errors"
sess "github.com/pingcap/tidb/pkg/ddl/session"
)
@ -44,7 +46,10 @@ type tableStore struct {
}
func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schemaChange) error {
// TODO: fill schema_change after we implement JSON serialization.
event, err := json.Marshal(change.event)
if err != nil {
return errors.Trace(err)
}
sql := fmt.Sprintf(`
INSERT INTO %s.%s (
ddl_job_id,
@ -53,9 +58,9 @@ func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schema
processed_by_flag
) VALUES (%d, %d, '%s', 0)`,
t.db, t.table,
change.ddlJobID, change.multiSchemaChangeSeq, "{}",
change.ddlJobID, change.multiSchemaChangeSeq, event,
)
_, err := s.Execute(ctx, sql, "ddl_notifier")
_, err = s.Execute(ctx, sql, "ddl_notifier")
return err
}
@ -88,11 +93,16 @@ func (t *tableStore) List(ctx context.Context, se *sess.Session, limit int) ([]*
}
ret := make([]*schemaChange, 0, len(rows))
for _, row := range rows {
event := SchemaChangeEvent{}
err = json.Unmarshal(row.GetBytes(2), &event)
if err != nil {
return nil, errors.Trace(err)
}
ret = append(ret, &schemaChange{
ddlJobID: row.GetInt64(0),
multiSchemaChangeSeq: row.GetInt64(1),
// TODO: fill schema_change after we implement JSON serialization.
processedByFlag: row.GetUint64(3),
event: &event,
processedByFlag: row.GetUint64(3),
})
}
return ret, nil