tablecodec: support decode common handle bytes to strings (#18953)

* tablecodec: support decode common handle bytes to strings

* address comments

Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
This commit is contained in:
tangenta
2020-08-04 13:35:48 +08:00
committed by GitHub
parent ef7c197271
commit bd06d239a2
3 changed files with 60 additions and 17 deletions

View File

@ -202,6 +202,25 @@ func (s *testSuite8) TestInsertOnDuplicateKey(c *C) {
tk.MustQuery(`select * from t1 use index(primary)`).Check(testkit.Rows(`1.0000`))
}
func (s *testSuite8) TestClusterIndexInsertOnDuplicateKey(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("drop database if exists cluster_index_duplicate_entry_error;")
tk.MustExec("create database cluster_index_duplicate_entry_error;")
tk.MustExec("use cluster_index_duplicate_entry_error;")
tk.MustExec("set @@tidb_enable_clustered_index = 1")
tk.MustExec("create table t(a char(20), b int, primary key(a));")
tk.MustExec("insert into t values('aa', 1), ('bb', 1);")
_, err := tk.Exec("insert into t values('aa', 2);")
c.Assert(err, ErrorMatches, ".*Duplicate entry 'aa' for.*")
tk.MustExec("drop table t;")
tk.MustExec("create table t(a char(20), b varchar(30), c varchar(10), primary key(a, b, c));")
tk.MustExec("insert into t values ('a', 'b', 'c'), ('b', 'a', 'c');")
_, err = tk.Exec("insert into t values ('a', 'b', 'c');")
c.Assert(err, ErrorMatches, ".*Duplicate entry 'a-b-c' for.*")
}
func (s *testSuite10) TestPaddingCommonHandle(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

View File

@ -201,7 +201,13 @@ func (c *twoPhaseCommitter) extractKeyExistsErr(key kv.Key) error {
_, handle, err := tablecodec.DecodeRecordKey(key)
if err == nil {
return kv.ErrKeyExists.FastGenByArgs(handle.String(), "PRIMARY")
if handle.IsInt() {
return kv.ErrKeyExists.FastGenByArgs(handle.String(), "PRIMARY")
}
values, err := tablecodec.DecodeValuesBytesToStrings(handle.Encoded())
if err == nil {
return kv.ErrKeyExists.FastGenByArgs(strings.Join(values, "-"), "PRIMARY")
}
}
tableID, indexID, indexValues, err := tablecodec.DecodeIndexKey(key)

View File

@ -127,13 +127,19 @@ func DecodeRecordKey(key kv.Key) (tableID int64, handle kv.Handle, err error) {
}
key = key[recordPrefixSepLength:]
var intHandle int64
key, intHandle, err = codec.DecodeInt(key)
if err != nil {
return 0, nil, errors.Trace(err)
if len(key) == 8 {
var intHandle int64
key, intHandle, err = codec.DecodeInt(key)
if err != nil {
return 0, nil, errors.Trace(err)
}
return tableID, kv.IntHandle(intHandle), nil
}
handle = kv.IntHandle(intHandle)
return
h, err := kv.NewCommonHandle(key)
if err != nil {
return 0, nil, errInvalidRecordKey.GenWithStack("invalid record key - %q %v", k, err)
}
return tableID, h, nil
}
// DecodeIndexKey decodes the key and gets the tableID, indexID, indexValues.
@ -145,24 +151,36 @@ func DecodeIndexKey(key kv.Key) (tableID int64, indexID int64, indexValues []str
return 0, 0, nil, errors.Trace(err)
}
if isRecord {
return 0, 0, nil, errInvalidIndexKey.GenWithStack("invalid index key - %q", k)
err = errInvalidIndexKey.GenWithStack("invalid index key - %q", k)
return 0, 0, nil, err
}
indexKey := key[prefixLen+idLen:]
for len(indexKey) > 0 {
// FIXME: Without the schema information, we can only decode the raw kind of
// the column. For instance, MysqlTime is internally saved as uint64.
remain, d, e := codec.DecodeOne(indexKey)
indexValues, err = DecodeValuesBytesToStrings(indexKey)
if err != nil {
err = errInvalidIndexKey.GenWithStack("invalid index key - %q %v", k, err)
return 0, 0, nil, err
}
return tableID, indexID, indexValues, nil
}
// DecodeValuesBytesToStrings decode the raw bytes to strings for each columns.
// FIXME: Without the schema information, we can only decode the raw kind of
// the column. For instance, MysqlTime is internally saved as uint64.
func DecodeValuesBytesToStrings(b []byte) ([]string, error) {
var datumValues []string
for len(b) > 0 {
remain, d, e := codec.DecodeOne(b)
if e != nil {
return 0, 0, nil, errInvalidIndexKey.GenWithStack("invalid index key - %q %v", k, e)
return nil, e
}
str, e1 := d.ToString()
if e1 != nil {
return 0, 0, nil, errInvalidIndexKey.GenWithStack("invalid index key - %q %v", k, e1)
return nil, e
}
indexValues = append(indexValues, str)
indexKey = remain
datumValues = append(datumValues, str)
b = remain
}
return tableID, indexID, indexValues, nil
return datumValues, nil
}
// DecodeMetaKey decodes the key and get the meta key and meta field.