binlog: always include old row for update and delete (#2624)
Old row value is required for translating to MySQL binlog.
This commit is contained in:
@ -144,24 +144,38 @@ func (s *testBinlogSuite) TestBinlog(c *C) {
|
||||
|
||||
tk.MustExec("update local_binlog set name = 'xyz' where id = 2")
|
||||
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
|
||||
expected = [][]types.Datum{
|
||||
oldRow := [][]types.Datum{
|
||||
{types.NewIntDatum(2), types.NewStringDatum("cde")},
|
||||
}
|
||||
newRow := [][]types.Datum{
|
||||
{types.NewIntDatum(2), types.NewStringDatum("xyz")},
|
||||
}
|
||||
gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].UpdatedRows, 2, 4)
|
||||
c.Assert(gotRows, DeepEquals, expected)
|
||||
gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].UpdatedRows, 1, 3)
|
||||
c.Assert(gotRows, DeepEquals, oldRow)
|
||||
|
||||
gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].UpdatedRows, 5, 7)
|
||||
c.Assert(gotRows, DeepEquals, newRow)
|
||||
|
||||
tk.MustExec("delete from local_binlog where id = 1")
|
||||
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
|
||||
c.Assert(prewriteVal.Mutations[0].DeletedIds, DeepEquals, []int64{1})
|
||||
gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].DeletedRows, 1, 3)
|
||||
expected = [][]types.Datum{
|
||||
{types.NewIntDatum(1), types.NewStringDatum("abc")},
|
||||
}
|
||||
c.Assert(gotRows, DeepEquals, expected)
|
||||
|
||||
// Test table primary key is not integer.
|
||||
tk.MustExec("create table local_binlog2 (name varchar(64) primary key, age int)")
|
||||
tk.MustExec("insert local_binlog2 values ('abc', 16), ('def', 18)")
|
||||
tk.MustExec("delete from local_binlog2 where name = 'def'")
|
||||
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
|
||||
c.Assert(prewriteVal.Mutations[0].Sequence[0], Equals, binlog.MutationType_DeletePK)
|
||||
_, deletedPK, _ := codec.DecodeOne(prewriteVal.Mutations[0].DeletedPks[0])
|
||||
c.Assert(deletedPK.GetString(), Equals, "def")
|
||||
c.Assert(prewriteVal.Mutations[0].Sequence[0], Equals, binlog.MutationType_DeleteRow)
|
||||
|
||||
expected = [][]types.Datum{
|
||||
{types.NewStringDatum("def"), types.NewIntDatum(18)},
|
||||
}
|
||||
gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].DeletedRows, 1, 3)
|
||||
c.Assert(gotRows, DeepEquals, expected)
|
||||
|
||||
// Test Table don't have primary key.
|
||||
tk.MustExec("create table local_binlog3 (c1 int, c2 int)")
|
||||
@ -193,7 +207,7 @@ func (s *testBinlogSuite) TestBinlog(c *C) {
|
||||
tk.MustExec("commit")
|
||||
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
|
||||
c.Assert(prewriteVal.Mutations[0].Sequence, DeepEquals, []binlog.MutationType{
|
||||
binlog.MutationType_DeleteID,
|
||||
binlog.MutationType_DeleteRow,
|
||||
binlog.MutationType_Insert,
|
||||
binlog.MutationType_Update,
|
||||
})
|
||||
@ -293,10 +307,6 @@ func mutationRowsToRows(c *C, mutationRows [][]byte, firstColumn, secondColumn i
|
||||
datums, err := codec.Decode(mutationRow, 5)
|
||||
c.Assert(err, IsNil)
|
||||
for i := range datums {
|
||||
if i != firstColumn && i != secondColumn {
|
||||
// Column ID or handle
|
||||
c.Assert(datums[i].GetInt64(), Greater, int64(0))
|
||||
}
|
||||
if datums[i].Kind() == types.KindBytes {
|
||||
datums[i].SetBytesAsString(datums[i].GetBytes())
|
||||
}
|
||||
|
||||
@ -491,70 +491,28 @@ func (t *Table) RemoveRecord(ctx context.Context, h int64, r []types.Datum) erro
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if shouldWriteBinlog(ctx) {
|
||||
err = t.addDeleteBinlog(ctx, h, r)
|
||||
err = t.addDeleteBinlog(ctx, r)
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (t *Table) addUpdateBinlog(ctx context.Context, h int64, old []types.Datum, newValue []byte, colIDs []int64) error {
|
||||
mutation := t.getMutation(ctx)
|
||||
hasPK := false
|
||||
if t.meta.PKIsHandle {
|
||||
hasPK = true
|
||||
} else {
|
||||
for _, idx := range t.meta.Indices {
|
||||
if idx.Primary {
|
||||
hasPK = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
var bin []byte
|
||||
if hasPK {
|
||||
handleData, _ := codec.EncodeValue(nil, types.NewIntDatum(h))
|
||||
bin = append(handleData, newValue...)
|
||||
} else {
|
||||
oldData, err := tablecodec.EncodeRow(old, colIDs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
bin = append(oldData, newValue...)
|
||||
oldData, err := tablecodec.EncodeRow(old, colIDs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
bin = append(oldData, newValue...)
|
||||
mutation := t.getMutation(ctx)
|
||||
mutation.UpdatedRows = append(mutation.UpdatedRows, bin)
|
||||
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Update)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Table) addDeleteBinlog(ctx context.Context, h int64, r []types.Datum) error {
|
||||
func (t *Table) addDeleteBinlog(ctx context.Context, r []types.Datum) error {
|
||||
mutation := t.getMutation(ctx)
|
||||
if t.meta.PKIsHandle {
|
||||
mutation.DeletedIds = append(mutation.DeletedIds, h)
|
||||
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeleteID)
|
||||
return nil
|
||||
}
|
||||
|
||||
var primaryIdx *model.IndexInfo
|
||||
for _, idx := range t.meta.Indices {
|
||||
if idx.Primary {
|
||||
primaryIdx = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
var data []byte
|
||||
var err error
|
||||
if primaryIdx != nil {
|
||||
indexedValues := make([]types.Datum, len(primaryIdx.Columns))
|
||||
for i := range indexedValues {
|
||||
indexedValues[i] = r[primaryIdx.Columns[i].Offset]
|
||||
}
|
||||
data, err = codec.EncodeKey(nil, indexedValues...)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
mutation.DeletedPks = append(mutation.DeletedPks, data)
|
||||
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeletePK)
|
||||
return nil
|
||||
}
|
||||
colIDs := make([]int64, len(t.Cols()))
|
||||
for i, col := range t.Cols() {
|
||||
colIDs[i] = col.ID
|
||||
|
||||
Reference in New Issue
Block a user