From 7ce6ec333359caf56ab60a074e3463d336fbb746 Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Wed, 18 Sep 2024 15:31:40 +0800 Subject: [PATCH] ddl_notifier: implement serialization and deserialization for `SchemaChangeEvent` (#56089) ref pingcap/tidb#55722 --- pkg/ddl/notifier/BUILD.bazel | 5 +- .../{schema_change_notifier.go => events.go} | 222 +++++++++++------- ...change_notifier_test.go => events_test.go} | 46 ++-- pkg/ddl/notifier/publish_testkit_test.go | 11 +- pkg/ddl/notifier/store.go | 20 +- 5 files changed, 184 insertions(+), 120 deletions(-) rename pkg/ddl/notifier/{schema_change_notifier.go => events.go} (62%) rename pkg/ddl/notifier/{schema_change_notifier_test.go => events_test.go} (67%) diff --git a/pkg/ddl/notifier/BUILD.bazel b/pkg/ddl/notifier/BUILD.bazel index 298a7e859f..76aa3c9136 100644 --- a/pkg/ddl/notifier/BUILD.bazel +++ b/pkg/ddl/notifier/BUILD.bazel @@ -3,8 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "notifier", srcs = [ + "events.go", "publish.go", - "schema_change_notifier.go", "store.go", ], importpath = "github.com/pingcap/tidb/pkg/ddl/notifier", @@ -13,6 +13,7 @@ go_library( "//pkg/ddl/session", "//pkg/meta/model", "//pkg/util/intest", + "@com_github_pingcap_errors//:errors", ], ) @@ -20,8 +21,8 @@ go_test( name = "notifier_test", timeout = "short", srcs = [ + "events_test.go", "publish_testkit_test.go", - "schema_change_notifier_test.go", ], embed = [":notifier"], flaky = True, diff --git a/pkg/ddl/notifier/schema_change_notifier.go b/pkg/ddl/notifier/events.go similarity index 62% rename from pkg/ddl/notifier/schema_change_notifier.go rename to pkg/ddl/notifier/events.go index 90fdf1c48c..dfba781dd8 100644 --- a/pkg/ddl/notifier/schema_change_notifier.go +++ b/pkg/ddl/notifier/events.go @@ -15,6 +15,7 @@ package notifier import ( + "encoding/json" "fmt" "strings" @@ -27,16 +28,7 @@ import ( // check the GetType of SchemaChange and call the corresponding getter function // to retrieve the needed information. type SchemaChangeEvent struct { - tableInfo *model.TableInfo - oldTableInfo *model.TableInfo - addedPartInfo *model.PartitionInfo - droppedPartInfo *model.PartitionInfo - columnInfos []*model.ColumnInfo - // oldTableID4Partition is used to store the table ID when a table transitions from being partitioned to non-partitioned, - // or vice versa. - oldTableID4Partition int64 - - tp model.ActionType + inner *jsonSchemaChangeEvent } // String implements fmt.Stringer interface. @@ -46,33 +38,33 @@ func (s *SchemaChangeEvent) String() string { } var sb strings.Builder - _, _ = fmt.Fprintf(&sb, "(Event Type: %s", s.tp) - if s.tableInfo != nil { - _, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.tableInfo.ID, s.tableInfo.Name) + _, _ = fmt.Fprintf(&sb, "(Event Type: %s", s.inner.Tp) + if s.inner.TableInfo != nil { + _, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.inner.TableInfo.ID, s.inner.TableInfo.Name) } - if s.oldTableInfo != nil { - _, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.oldTableInfo.ID, s.oldTableInfo.Name) + if s.inner.OldTableInfo != nil { + _, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.inner.OldTableInfo.ID, s.inner.OldTableInfo.Name) } - if s.oldTableID4Partition != 0 { - _, _ = fmt.Fprintf(&sb, ", Old Table ID for Partition: %d", s.oldTableID4Partition) + if s.inner.OldTableID4Partition != 0 { + _, _ = fmt.Fprintf(&sb, ", Old Table ID for Partition: %d", s.inner.OldTableID4Partition) } - if s.addedPartInfo != nil { - for _, partDef := range s.addedPartInfo.Definitions { + if s.inner.AddedPartInfo != nil { + for _, partDef := range s.inner.AddedPartInfo.Definitions { if partDef.Name.L != "" { _, _ = fmt.Fprintf(&sb, ", Partition Name: %s", partDef.Name) } _, _ = fmt.Fprintf(&sb, ", Partition ID: %d", partDef.ID) } } - if s.droppedPartInfo != nil { - for _, partDef := range s.droppedPartInfo.Definitions { + if s.inner.DroppedPartInfo != nil { + for _, partDef := range s.inner.DroppedPartInfo.Definitions { if partDef.Name.L != "" { _, _ = fmt.Fprintf(&sb, ", Dropped Partition Name: %s", partDef.Name) } _, _ = fmt.Fprintf(&sb, ", Dropped Partition ID: %d", partDef.ID) } } - for _, columnInfo := range s.columnInfos { + for _, columnInfo := range s.inner.ColumnInfos { _, _ = fmt.Fprintf(&sb, ", Column ID: %d, Column Name: %s", columnInfo.ID, columnInfo.Name) } sb.WriteString(")") @@ -85,7 +77,7 @@ func (s *SchemaChangeEvent) GetType() model.ActionType { if s == nil { return model.ActionNone } - return s.tp + return s.inner.Tp } // NewCreateTableEvent creates a SchemaChangeEvent whose type is @@ -94,16 +86,18 @@ func NewCreateTableEvent( newTableInfo *model.TableInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionCreateTable, - tableInfo: newTableInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionCreateTable, + TableInfo: newTableInfo, + }, } } // GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type // is ActionCreateTable. func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo { - intest.Assert(s.tp == model.ActionCreateTable) - return s.tableInfo + intest.Assert(s.inner.Tp == model.ActionCreateTable) + return s.inner.TableInfo } // NewTruncateTableEvent creates a SchemaChangeEvent whose type is @@ -113,9 +107,11 @@ func NewTruncateTableEvent( droppedTableInfo *model.TableInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionTruncateTable, - tableInfo: newTableInfo, - oldTableInfo: droppedTableInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionTruncateTable, + TableInfo: newTableInfo, + OldTableInfo: droppedTableInfo, + }, } } @@ -125,8 +121,8 @@ func (s *SchemaChangeEvent) GetTruncateTableInfo() ( newTableInfo *model.TableInfo, droppedTableInfo *model.TableInfo, ) { - intest.Assert(s.tp == model.ActionTruncateTable) - return s.tableInfo, s.oldTableInfo + intest.Assert(s.inner.Tp == model.ActionTruncateTable) + return s.inner.TableInfo, s.inner.OldTableInfo } // NewDropTableEvent creates a SchemaChangeEvent whose type is ActionDropTable. @@ -134,16 +130,18 @@ func NewDropTableEvent( droppedTableInfo *model.TableInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionDropTable, - oldTableInfo: droppedTableInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionDropTable, + OldTableInfo: droppedTableInfo, + }, } } // GetDropTableInfo returns the table info of the SchemaChangeEvent whose type is // ActionDropTable. func (s *SchemaChangeEvent) GetDropTableInfo() (droppedTableInfo *model.TableInfo) { - intest.Assert(s.tp == model.ActionDropTable) - return s.oldTableInfo + intest.Assert(s.inner.Tp == model.ActionDropTable) + return s.inner.OldTableInfo } // NewAddColumnEvent creates a SchemaChangeEvent whose type is ActionAddColumn. @@ -152,9 +150,11 @@ func NewAddColumnEvent( newColumnInfos []*model.ColumnInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionAddColumn, - tableInfo: tableInfo, - columnInfos: newColumnInfos, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionAddColumn, + TableInfo: tableInfo, + ColumnInfos: newColumnInfos, + }, } } @@ -164,8 +164,8 @@ func (s *SchemaChangeEvent) GetAddColumnInfo() ( tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo, ) { - intest.Assert(s.tp == model.ActionAddColumn) - return s.tableInfo, s.columnInfos + intest.Assert(s.inner.Tp == model.ActionAddColumn) + return s.inner.TableInfo, s.inner.ColumnInfos } // NewModifyColumnEvent creates a SchemaChangeEvent whose type is @@ -175,9 +175,11 @@ func NewModifyColumnEvent( modifiedColumnInfo []*model.ColumnInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionModifyColumn, - tableInfo: tableInfo, - columnInfos: modifiedColumnInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionModifyColumn, + TableInfo: tableInfo, + ColumnInfos: modifiedColumnInfo, + }, } } @@ -187,8 +189,8 @@ func (s *SchemaChangeEvent) GetModifyColumnInfo() ( newTableInfo *model.TableInfo, modifiedColumnInfo []*model.ColumnInfo, ) { - intest.Assert(s.tp == model.ActionModifyColumn) - return s.tableInfo, s.columnInfos + intest.Assert(s.inner.Tp == model.ActionModifyColumn) + return s.inner.TableInfo, s.inner.ColumnInfos } // NewAddPartitionEvent creates a SchemaChangeEvent whose type is @@ -198,9 +200,11 @@ func NewAddPartitionEvent( addedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionAddTablePartition, - tableInfo: globalTableInfo, - addedPartInfo: addedPartInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionAddTablePartition, + TableInfo: globalTableInfo, + AddedPartInfo: addedPartInfo, + }, } } @@ -210,8 +214,8 @@ func (s *SchemaChangeEvent) GetAddPartitionInfo() ( globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ) { - intest.Assert(s.tp == model.ActionAddTablePartition) - return s.tableInfo, s.addedPartInfo + intest.Assert(s.inner.Tp == model.ActionAddTablePartition) + return s.inner.TableInfo, s.inner.AddedPartInfo } // NewTruncatePartitionEvent creates a SchemaChangeEvent whose type is @@ -222,10 +226,12 @@ func NewTruncatePartitionEvent( droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionTruncateTablePartition, - tableInfo: globalTableInfo, - addedPartInfo: addedPartInfo, - droppedPartInfo: droppedPartInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionTruncateTablePartition, + TableInfo: globalTableInfo, + AddedPartInfo: addedPartInfo, + DroppedPartInfo: droppedPartInfo, + }, } } @@ -237,8 +243,8 @@ func (s *SchemaChangeEvent) GetTruncatePartitionInfo() ( addedPartInfo *model.PartitionInfo, droppedPartInfo *model.PartitionInfo, ) { - intest.Assert(s.tp == model.ActionTruncateTablePartition) - return s.tableInfo, s.addedPartInfo, s.droppedPartInfo + intest.Assert(s.inner.Tp == model.ActionTruncateTablePartition) + return s.inner.TableInfo, s.inner.AddedPartInfo, s.inner.DroppedPartInfo } // NewDropPartitionEvent creates a SchemaChangeEvent whose type is @@ -248,9 +254,11 @@ func NewDropPartitionEvent( droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionDropTablePartition, - tableInfo: globalTableInfo, - droppedPartInfo: droppedPartInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionDropTablePartition, + TableInfo: globalTableInfo, + DroppedPartInfo: droppedPartInfo, + }, } } @@ -260,8 +268,8 @@ func (s *SchemaChangeEvent) GetDropPartitionInfo() ( globalTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo, ) { - intest.Assert(s.tp == model.ActionDropTablePartition) - return s.tableInfo, s.droppedPartInfo + intest.Assert(s.inner.Tp == model.ActionDropTablePartition) + return s.inner.TableInfo, s.inner.DroppedPartInfo } // NewExchangePartitionEvent creates a SchemaChangeEvent whose type is @@ -272,10 +280,12 @@ func NewExchangePartitionEvent( nonPartTableInfo *model.TableInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionExchangeTablePartition, - tableInfo: globalTableInfo, - addedPartInfo: partInfo, - oldTableInfo: nonPartTableInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionExchangeTablePartition, + TableInfo: globalTableInfo, + AddedPartInfo: partInfo, + OldTableInfo: nonPartTableInfo, + }, } } @@ -287,8 +297,8 @@ func (s *SchemaChangeEvent) GetExchangePartitionInfo() ( partInfo *model.PartitionInfo, nonPartTableInfo *model.TableInfo, ) { - intest.Assert(s.tp == model.ActionExchangeTablePartition) - return s.tableInfo, s.addedPartInfo, s.oldTableInfo + intest.Assert(s.inner.Tp == model.ActionExchangeTablePartition) + return s.inner.TableInfo, s.inner.AddedPartInfo, s.inner.OldTableInfo } // NewReorganizePartitionEvent creates a SchemaChangeEvent whose type is @@ -299,10 +309,12 @@ func NewReorganizePartitionEvent( droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionReorganizePartition, - tableInfo: globalTableInfo, - addedPartInfo: addedPartInfo, - droppedPartInfo: droppedPartInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionReorganizePartition, + TableInfo: globalTableInfo, + AddedPartInfo: addedPartInfo, + DroppedPartInfo: droppedPartInfo, + }, } } @@ -314,8 +326,8 @@ func (s *SchemaChangeEvent) GetReorganizePartitionInfo() ( addedPartInfo *model.PartitionInfo, droppedPartInfo *model.PartitionInfo, ) { - intest.Assert(s.tp == model.ActionReorganizePartition) - return s.tableInfo, s.addedPartInfo, s.droppedPartInfo + intest.Assert(s.inner.Tp == model.ActionReorganizePartition) + return s.inner.TableInfo, s.inner.AddedPartInfo, s.inner.DroppedPartInfo } // NewAddPartitioningEvent creates a SchemaChangeEvent whose type is @@ -328,10 +340,12 @@ func NewAddPartitioningEvent( addedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionAlterTablePartitioning, - oldTableID4Partition: nonPartTableID, - tableInfo: newGlobalTableInfo, - addedPartInfo: addedPartInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionAlterTablePartitioning, + OldTableID4Partition: nonPartTableID, + TableInfo: newGlobalTableInfo, + AddedPartInfo: addedPartInfo, + }, } } @@ -343,8 +357,8 @@ func (s *SchemaChangeEvent) GetAddPartitioningInfo() ( newGlobalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ) { - intest.Assert(s.tp == model.ActionAlterTablePartitioning) - return s.oldTableID4Partition, s.tableInfo, s.addedPartInfo + intest.Assert(s.inner.Tp == model.ActionAlterTablePartitioning) + return s.inner.OldTableID4Partition, s.inner.TableInfo, s.inner.AddedPartInfo } // NewRemovePartitioningEvent creates a schema change event whose type is @@ -355,10 +369,12 @@ func NewRemovePartitioningEvent( droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionRemovePartitioning, - oldTableID4Partition: oldPartitionedTableID, - tableInfo: nonPartitionTableInfo, - droppedPartInfo: droppedPartInfo, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionRemovePartitioning, + OldTableID4Partition: oldPartitionedTableID, + TableInfo: nonPartitionTableInfo, + DroppedPartInfo: droppedPartInfo, + }, } } @@ -369,14 +385,46 @@ func (s *SchemaChangeEvent) GetRemovePartitioningInfo() ( newSingleTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo, ) { - intest.Assert(s.tp == model.ActionRemovePartitioning) - return s.oldTableID4Partition, s.tableInfo, s.droppedPartInfo + intest.Assert(s.inner.Tp == model.ActionRemovePartitioning) + return s.inner.OldTableID4Partition, s.inner.TableInfo, s.inner.DroppedPartInfo } // NewFlashbackClusterEvent creates a schema change event whose type is // ActionFlashbackCluster. func NewFlashbackClusterEvent() *SchemaChangeEvent { return &SchemaChangeEvent{ - tp: model.ActionFlashbackCluster, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionFlashbackCluster, + }, } } + +// jsonSchemaChangeEvent is used by SchemaChangeEvent when needed to (un)marshal data, +// we want to hide the details to subscribers, so SchemaChangeEvent contain this struct. +type jsonSchemaChangeEvent struct { + TableInfo *model.TableInfo `json:"table_info,omitempty"` + OldTableInfo *model.TableInfo `json:"old_table_info,omitempty"` + AddedPartInfo *model.PartitionInfo `json:"added_partition_info,omitempty"` + DroppedPartInfo *model.PartitionInfo `json:"dropped_partition_info,omitempty"` + ColumnInfos []*model.ColumnInfo `json:"column_infos,omitempty"` + // OldTableID4Partition is used to store the table ID when a table transitions from being partitioned to non-partitioned, + // or vice versa. + OldTableID4Partition int64 `json:"old_table_id_for_partition,omitempty"` + + Tp model.ActionType `json:"type,omitempty"` +} + +// MarshalJSON implements the json.Marshaler interface. +func (s *SchemaChangeEvent) MarshalJSON() ([]byte, error) { + return json.Marshal(s.inner) +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (s *SchemaChangeEvent) UnmarshalJSON(b []byte) error { + var j jsonSchemaChangeEvent + err := json.Unmarshal(b, &j) + if err != nil { + s.inner = &j + } + return err +} diff --git a/pkg/ddl/notifier/schema_change_notifier_test.go b/pkg/ddl/notifier/events_test.go similarity index 67% rename from pkg/ddl/notifier/schema_change_notifier_test.go rename to pkg/ddl/notifier/events_test.go index 511ffa1c11..f4222fad35 100644 --- a/pkg/ddl/notifier/schema_change_notifier_test.go +++ b/pkg/ddl/notifier/events_test.go @@ -25,30 +25,32 @@ import ( func TestEventString(t *testing.T) { // Create an Event object e := &SchemaChangeEvent{ - tp: model.ActionAddColumn, - tableInfo: &model.TableInfo{ - ID: 1, - Name: pmodel.NewCIStr("Table1"), - }, - addedPartInfo: &model.PartitionInfo{ - Definitions: []model.PartitionDefinition{ - {ID: 2}, - {ID: 3}, + inner: &jsonSchemaChangeEvent{ + Tp: model.ActionAddColumn, + TableInfo: &model.TableInfo{ + ID: 1, + Name: pmodel.NewCIStr("Table1"), }, - }, - oldTableInfo: &model.TableInfo{ - ID: 4, - Name: pmodel.NewCIStr("Table2"), - }, - droppedPartInfo: &model.PartitionInfo{ - Definitions: []model.PartitionDefinition{ - {ID: 5}, - {ID: 6}, + AddedPartInfo: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + {ID: 2}, + {ID: 3}, + }, + }, + OldTableInfo: &model.TableInfo{ + ID: 4, + Name: pmodel.NewCIStr("Table2"), + }, + DroppedPartInfo: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + {ID: 5}, + {ID: 6}, + }, + }, + ColumnInfos: []*model.ColumnInfo{ + {ID: 7, Name: pmodel.NewCIStr("Column1")}, + {ID: 8, Name: pmodel.NewCIStr("Column2")}, }, - }, - columnInfos: []*model.ColumnInfo{ - {ID: 7, Name: pmodel.NewCIStr("Column1")}, - {ID: 8, Name: pmodel.NewCIStr("Column2")}, }, } diff --git a/pkg/ddl/notifier/publish_testkit_test.go b/pkg/ddl/notifier/publish_testkit_test.go index 2f5b535c3d..0f345435d7 100644 --- a/pkg/ddl/notifier/publish_testkit_test.go +++ b/pkg/ddl/notifier/publish_testkit_test.go @@ -20,6 +20,8 @@ import ( "github.com/pingcap/tidb/pkg/ddl/notifier" sess "github.com/pingcap/tidb/pkg/ddl/session" + "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -33,7 +35,7 @@ func TestPublishToTableStore(t *testing.T) { CREATE TABLE ddl_notifier ( ddl_job_id BIGINT, multi_schema_change_seq BIGINT COMMENT '-1 if the schema change does not belong to a multi-schema change DDL. 0 or positive numbers representing the sub-job index of a multi-schema change DDL', - schema_change JSON COMMENT 'SchemaChange at rest', + schema_change LONGBLOB COMMENT 'SchemaChangeEvent at rest', processed_by_flag BIGINT UNSIGNED DEFAULT 0 COMMENT 'flag to mark which subscriber has processed the event', PRIMARY KEY(ddl_job_id, multi_schema_change_seq) ) @@ -42,11 +44,12 @@ CREATE TABLE ddl_notifier ( ctx := context.Background() s := notifier.OpenTableStore("test", "ddl_notifier") se := sess.NewSession(tk.Session()) - err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, nil, s) + event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")}) + err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s) require.NoError(t, err) - err = notifier.PubSchemeChangeToStore(ctx, se, 2, -1, nil, s) + event2 := notifier.NewDropTableEvent(&model.TableInfo{ID: 1001, Name: pmodel.NewCIStr("t2")}) + err = notifier.PubSchemeChangeToStore(ctx, se, 2, -1, event2, s) require.NoError(t, err) - got, err := s.List(ctx, se, 1) require.NoError(t, err) require.Len(t, got, 1) diff --git a/pkg/ddl/notifier/store.go b/pkg/ddl/notifier/store.go index 840dc5e06d..c652fd0052 100644 --- a/pkg/ddl/notifier/store.go +++ b/pkg/ddl/notifier/store.go @@ -16,8 +16,10 @@ package notifier import ( "context" + "encoding/json" "fmt" + "github.com/pingcap/errors" sess "github.com/pingcap/tidb/pkg/ddl/session" ) @@ -44,7 +46,10 @@ type tableStore struct { } func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schemaChange) error { - // TODO: fill schema_change after we implement JSON serialization. + event, err := json.Marshal(change.event) + if err != nil { + return errors.Trace(err) + } sql := fmt.Sprintf(` INSERT INTO %s.%s ( ddl_job_id, @@ -53,9 +58,9 @@ func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schema processed_by_flag ) VALUES (%d, %d, '%s', 0)`, t.db, t.table, - change.ddlJobID, change.multiSchemaChangeSeq, "{}", + change.ddlJobID, change.multiSchemaChangeSeq, event, ) - _, err := s.Execute(ctx, sql, "ddl_notifier") + _, err = s.Execute(ctx, sql, "ddl_notifier") return err } @@ -88,11 +93,16 @@ func (t *tableStore) List(ctx context.Context, se *sess.Session, limit int) ([]* } ret := make([]*schemaChange, 0, len(rows)) for _, row := range rows { + event := SchemaChangeEvent{} + err = json.Unmarshal(row.GetBytes(2), &event) + if err != nil { + return nil, errors.Trace(err) + } ret = append(ret, &schemaChange{ ddlJobID: row.GetInt64(0), multiSchemaChangeSeq: row.GetInt64(1), - // TODO: fill schema_change after we implement JSON serialization. - processedByFlag: row.GetUint64(3), + event: &event, + processedByFlag: row.GetUint64(3), }) } return ret, nil