diff --git a/sessionctx/binloginfo/binloginfo_test.go b/sessionctx/binloginfo/binloginfo_test.go index beca401723..b3400ffb15 100644 --- a/sessionctx/binloginfo/binloginfo_test.go +++ b/sessionctx/binloginfo/binloginfo_test.go @@ -144,24 +144,38 @@ func (s *testBinlogSuite) TestBinlog(c *C) { tk.MustExec("update local_binlog set name = 'xyz' where id = 2") prewriteVal = getLatestBinlogPrewriteValue(c, pump) - expected = [][]types.Datum{ + oldRow := [][]types.Datum{ + {types.NewIntDatum(2), types.NewStringDatum("cde")}, + } + newRow := [][]types.Datum{ {types.NewIntDatum(2), types.NewStringDatum("xyz")}, } - gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].UpdatedRows, 2, 4) - c.Assert(gotRows, DeepEquals, expected) + gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].UpdatedRows, 1, 3) + c.Assert(gotRows, DeepEquals, oldRow) + + gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].UpdatedRows, 5, 7) + c.Assert(gotRows, DeepEquals, newRow) tk.MustExec("delete from local_binlog where id = 1") prewriteVal = getLatestBinlogPrewriteValue(c, pump) - c.Assert(prewriteVal.Mutations[0].DeletedIds, DeepEquals, []int64{1}) + gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].DeletedRows, 1, 3) + expected = [][]types.Datum{ + {types.NewIntDatum(1), types.NewStringDatum("abc")}, + } + c.Assert(gotRows, DeepEquals, expected) // Test table primary key is not integer. tk.MustExec("create table local_binlog2 (name varchar(64) primary key, age int)") tk.MustExec("insert local_binlog2 values ('abc', 16), ('def', 18)") tk.MustExec("delete from local_binlog2 where name = 'def'") prewriteVal = getLatestBinlogPrewriteValue(c, pump) - c.Assert(prewriteVal.Mutations[0].Sequence[0], Equals, binlog.MutationType_DeletePK) - _, deletedPK, _ := codec.DecodeOne(prewriteVal.Mutations[0].DeletedPks[0]) - c.Assert(deletedPK.GetString(), Equals, "def") + c.Assert(prewriteVal.Mutations[0].Sequence[0], Equals, binlog.MutationType_DeleteRow) + + expected = [][]types.Datum{ + {types.NewStringDatum("def"), types.NewIntDatum(18)}, + } + gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].DeletedRows, 1, 3) + c.Assert(gotRows, DeepEquals, expected) // Test Table don't have primary key. tk.MustExec("create table local_binlog3 (c1 int, c2 int)") @@ -193,7 +207,7 @@ func (s *testBinlogSuite) TestBinlog(c *C) { tk.MustExec("commit") prewriteVal = getLatestBinlogPrewriteValue(c, pump) c.Assert(prewriteVal.Mutations[0].Sequence, DeepEquals, []binlog.MutationType{ - binlog.MutationType_DeleteID, + binlog.MutationType_DeleteRow, binlog.MutationType_Insert, binlog.MutationType_Update, }) @@ -293,10 +307,6 @@ func mutationRowsToRows(c *C, mutationRows [][]byte, firstColumn, secondColumn i datums, err := codec.Decode(mutationRow, 5) c.Assert(err, IsNil) for i := range datums { - if i != firstColumn && i != secondColumn { - // Column ID or handle - c.Assert(datums[i].GetInt64(), Greater, int64(0)) - } if datums[i].Kind() == types.KindBytes { datums[i].SetBytesAsString(datums[i].GetBytes()) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 0d635d5a70..5e0efa36c9 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -491,70 +491,28 @@ func (t *Table) RemoveRecord(ctx context.Context, h int64, r []types.Datum) erro return errors.Trace(err) } if shouldWriteBinlog(ctx) { - err = t.addDeleteBinlog(ctx, h, r) + err = t.addDeleteBinlog(ctx, r) } return errors.Trace(err) } func (t *Table) addUpdateBinlog(ctx context.Context, h int64, old []types.Datum, newValue []byte, colIDs []int64) error { - mutation := t.getMutation(ctx) - hasPK := false - if t.meta.PKIsHandle { - hasPK = true - } else { - for _, idx := range t.meta.Indices { - if idx.Primary { - hasPK = true - break - } - } - } var bin []byte - if hasPK { - handleData, _ := codec.EncodeValue(nil, types.NewIntDatum(h)) - bin = append(handleData, newValue...) - } else { - oldData, err := tablecodec.EncodeRow(old, colIDs) - if err != nil { - return errors.Trace(err) - } - bin = append(oldData, newValue...) + oldData, err := tablecodec.EncodeRow(old, colIDs) + if err != nil { + return errors.Trace(err) } + bin = append(oldData, newValue...) + mutation := t.getMutation(ctx) mutation.UpdatedRows = append(mutation.UpdatedRows, bin) mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Update) return nil } -func (t *Table) addDeleteBinlog(ctx context.Context, h int64, r []types.Datum) error { +func (t *Table) addDeleteBinlog(ctx context.Context, r []types.Datum) error { mutation := t.getMutation(ctx) - if t.meta.PKIsHandle { - mutation.DeletedIds = append(mutation.DeletedIds, h) - mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeleteID) - return nil - } - - var primaryIdx *model.IndexInfo - for _, idx := range t.meta.Indices { - if idx.Primary { - primaryIdx = idx - break - } - } var data []byte var err error - if primaryIdx != nil { - indexedValues := make([]types.Datum, len(primaryIdx.Columns)) - for i := range indexedValues { - indexedValues[i] = r[primaryIdx.Columns[i].Offset] - } - data, err = codec.EncodeKey(nil, indexedValues...) - if err != nil { - return errors.Trace(err) - } - mutation.DeletedPks = append(mutation.DeletedPks, data) - mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeletePK) - return nil - } colIDs := make([]int64, len(t.Cols())) for i, col := range t.Cols() { colIDs[i] = col.ID