From 1662e7ed68fba0667c5efce323b83d211dda86db Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Mon, 21 Mar 2016 17:25:50 +0800 Subject: [PATCH] xapi: add xapi functions. --- kv/key.go | 23 ++ kv/kv.go | 2 + util/codec/codec.go | 91 ++++---- xapi/tablecodec/tablecodec.go | 344 +++++++++++++++++++++++++++++ xapi/tablecodec/tablecodec_test.go | 30 +++ xapi/xapi.go | 136 ++++++++++++ 6 files changed, 586 insertions(+), 40 deletions(-) create mode 100644 xapi/tablecodec/tablecodec.go create mode 100644 xapi/tablecodec/tablecodec_test.go create mode 100644 xapi/xapi.go diff --git a/kv/key.go b/kv/key.go index fed902ff20..a03a027996 100644 --- a/kv/key.go +++ b/kv/key.go @@ -26,6 +26,29 @@ func (k Key) Next() Key { return buf } +// PartialNext returns the next partial key. +// For example, a key composed with two columns. +// Next will return a key with the same first column value, +// but PartialNext will return a key with different first column value. +// key encoding method must ensure the next different value has the +// same length as the original value. +func (k Key) PartialNext() Key { + buf := make([]byte, len([]byte(k))) + copy(buf, []byte(k)) + var i int + for i = len(k) - 1; i >= 0; i-- { + buf[i]++ + if buf[i] != 0 { + break + } + } + if i == -1 { + copy(buf, k) + buf = append(buf, 0) + } + return buf +} + // Cmp returns the comparison result of two key. // The result will be 0 if a==b, -1 if a < b, and +1 if a > b. func (k Key) Cmp(another Key) int { diff --git a/kv/kv.go b/kv/kv.go index fb75e4944a..dcd22039af 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -125,6 +125,8 @@ type Response interface { // Next returns a resultSubset from a single storage unit. // When full result set is returned, nil is returned. Next() (resultSubset io.ReadCloser, err error) + // Close response. + Close() error } // Snapshot defines the interface for the snapshot fetched from KV store. diff --git a/util/codec/codec.go b/util/codec/codec.go index bf100fc186..782d9c9c92 100644 --- a/util/codec/codec.go +++ b/util/codec/codec.go @@ -108,52 +108,13 @@ func Decode(b []byte) ([]types.Datum, error) { } var ( - flag byte err error values = make([]types.Datum, 0, 1) ) for len(b) > 0 { - flag = b[0] - b = b[1:] var d types.Datum - switch flag { - case intFlag: - var v int64 - b, v, err = DecodeInt(b) - d.SetInt64(v) - case uintFlag: - var v uint64 - b, v, err = DecodeUint(b) - d.SetUint64(v) - case floatFlag: - var v float64 - b, v, err = DecodeFloat(b) - d.SetFloat64(v) - case bytesFlag: - var v []byte - b, v, err = DecodeBytes(b) - d.SetBytes(v) - case compactBytesFlag: - var v []byte - b, v, err = DecodeCompactBytes(b) - d.SetBytes(v) - case decimalFlag: - var v mysql.Decimal - b, v, err = DecodeDecimal(b) - d.SetValue(v) - case durationFlag: - var r int64 - b, r, err = DecodeInt(b) - if err == nil { - // use max fsp, let outer to do round manually. - v := mysql.Duration{Duration: time.Duration(r), Fsp: mysql.MaxFsp} - d.SetValue(v) - } - case nilFlag: - default: - return nil, errors.Errorf("invalid encoded key flag %v", flag) - } + b, d, err = DecodeOne(b) if err != nil { return nil, errors.Trace(err) } @@ -163,3 +124,53 @@ func Decode(b []byte) ([]types.Datum, error) { return values, nil } + +// DecodeOne decodes on datum from a byte slice generated with EncodeKey or EncodeValue. +func DecodeOne(b []byte) (remain []byte, d types.Datum, err error) { + if len(b) < 1 { + return nil, d, errors.New("invalid encoded key") + } + flag := b[0] + b = b[1:] + switch flag { + case intFlag: + var v int64 + b, v, err = DecodeInt(b) + d.SetInt64(v) + case uintFlag: + var v uint64 + b, v, err = DecodeUint(b) + d.SetUint64(v) + case floatFlag: + var v float64 + b, v, err = DecodeFloat(b) + d.SetFloat64(v) + case bytesFlag: + var v []byte + b, v, err = DecodeBytes(b) + d.SetBytes(v) + case compactBytesFlag: + var v []byte + b, v, err = DecodeCompactBytes(b) + d.SetBytes(v) + case decimalFlag: + var v mysql.Decimal + b, v, err = DecodeDecimal(b) + d.SetValue(v) + case durationFlag: + var r int64 + b, r, err = DecodeInt(b) + if err == nil { + // use max fsp, let outer to do round manually. + v := mysql.Duration{Duration: time.Duration(r), Fsp: mysql.MaxFsp} + d.SetValue(v) + } + case nilFlag: + default: + return b, d, errors.Errorf("invalid encoded key flag %v", flag) + } + if err != nil { + return b, d, errors.Trace(err) + } + return b, d, nil +} diff --git a/xapi/tablecodec/tablecodec.go b/xapi/tablecodec/tablecodec.go new file mode 100644 index 0000000000..78c9aa2b3f --- /dev/null +++ b/xapi/tablecodec/tablecodec.go @@ -0,0 +1,344 @@ +package tablecodec + +import ( + "sort" + "time" + + "bytes" + "github.com/golang/protobuf/proto" + "github.com/juju/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/types" + "github.com/pingcap/tidb/xapi/tipb" +) + +var ( + tablePrefix = []byte{'t'} + recordPrefixSep = []byte("_r") + indexPrefixSep = []byte("_i") +) + +const ( + idLen = 8 + prefixLen = 1 + idLen /*tableID*/ + 2 + recordRowKeyLen = prefixLen + idLen /*handle*/ +) + +// EncodeRowKey encodes the table id and record handle into a kv.Key +func EncodeRowKey(tableID int64, encodedHandle []byte) kv.Key { + buf := make([]byte, 0, recordRowKeyLen) + buf = appendTableRecordPrefix(buf, tableID) + buf = append(buf, encodedHandle...) + return buf +} + +// EncodeColumnKey encodes the table id, row handle and columnID into a kv.Key +func EncodeColumnKey(tableID int64, encodedHandle []byte, columnID int64) kv.Key { + buf := make([]byte, 0, recordRowKeyLen+idLen) + buf = appendTableRecordPrefix(buf, tableID) + buf = append(buf, encodedHandle...) + buf = codec.EncodeInt(buf, columnID) + return buf +} + +// DecodeRowKey decodes the key and gets the handle. +func DecodeRowKey(key kv.Key) (handle int64, err error) { + k := key + if !key.HasPrefix(tablePrefix) { + return 0, errors.Errorf("invalid record key - %q", k) + } + + key = key[len(tablePrefix):] + // Table ID is not needed. + key, _, err = codec.DecodeInt(key) + if err != nil { + return 0, errors.Trace(err) + } + + if !key.HasPrefix(recordPrefixSep) { + return 0, errors.Errorf("invalid record key - %q", k) + } + + key = key[len(recordPrefixSep):] + + key, handle, err = codec.DecodeInt(key) + if err != nil { + return 0, errors.Trace(err) + } + return +} + +// DecodeValues decodes a byte slice into datums with column types. +func DecodeValues(data []byte, fts []*types.FieldType) ([]types.Datum, error) { + values, err := codec.Decode(data) + if err != nil { + return nil, errors.Trace(err) + } + if len(values) > len(fts) { + return nil, errors.Errorf("invalid column count %d is less than value count %d", len(fts), len(values)) + } + for i := range values { + values[i], err = unflatten(values[i], fts[i]) + if err != nil { + return nil, errors.Trace(err) + } + } + return values, nil +} + +// unflatten converts a raw datum to a column datum. +func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) { + if datum.Kind() == types.KindNull { + return datum, nil + } + switch ft.Tp { + case mysql.TypeFloat: + datum.SetFloat32(float32(datum.GetFloat64())) + return datum, nil + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeYear, mysql.TypeInt24, + mysql.TypeLong, mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeTinyBlob, + mysql.TypeMediumBlob, mysql.TypeBlob, mysql.TypeLongBlob, mysql.TypeVarchar, + mysql.TypeString: + return datum, nil + case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp: + var t mysql.Time + t.Type = ft.Tp + t.Fsp = ft.Decimal + err := t.Unmarshal(datum.GetBytes()) + if err != nil { + return datum, errors.Trace(err) + } + datum.SetValue(t) + return datum, nil + case mysql.TypeDuration: + dur := mysql.Duration{Duration: time.Duration(datum.GetInt64())} + datum.SetValue(dur) + return datum, nil + case mysql.TypeNewDecimal: + dec, err := mysql.ParseDecimal(datum.GetString()) + if err != nil { + return datum, errors.Trace(err) + } + datum.SetValue(dec) + return datum, nil + case mysql.TypeEnum: + enum, err := mysql.ParseEnumValue(ft.Elems, datum.GetUint64()) + if err != nil { + return datum, errors.Trace(err) + } + datum.SetValue(enum) + return datum, nil + case mysql.TypeSet: + set, err := mysql.ParseSetValue(ft.Elems, datum.GetUint64()) + if err != nil { + return datum, errors.Trace(err) + } + datum.SetValue(set) + return datum, nil + case mysql.TypeBit: + bit := mysql.Bit{Value: datum.GetUint64(), Width: ft.Flen} + datum.SetValue(bit) + return datum, nil + } + return datum, nil +} + +// EncodeIndexSeekKey encodes an index value to kv.Key. +func EncodeIndexSeekKey(tableID int64, encodedValue []byte) kv.Key { + key := make([]byte, 0, prefixLen+len(encodedValue)) + key = appendTableIndexPrefix(key, tableID) + key = append(key, encodedValue...) + return key +} + +// IndexRowData extracts the row data and handle in an index key. +// If the index is unique, handle would be nil. +func IndexRowData(key kv.Key, columnCount int) (data, handle []byte, err error) { + b := key[prefixLen:] + // The index key may have primary key appended to the end, we decode the column values + // only to get the column values length. + for columnCount > 0 { + b, _, err = codec.DecodeOne(b) + if err != nil { + return nil, nil, errors.Trace(err) + } + columnCount-- + } + handle = b + return key[prefixLen : len(key)-len(handle)], handle, nil +} + +// Record prefix is "t[tableID]_r". +func appendTableRecordPrefix(buf []byte, tableID int64) []byte { + buf = append(buf, tablePrefix...) + buf = codec.EncodeInt(buf, tableID) + buf = append(buf, recordPrefixSep...) + return buf +} + +// Index prefix is "t[tableID]_i". +func appendTableIndexPrefix(buf []byte, tableID int64) []byte { + buf = append(buf, tablePrefix...) + buf = codec.EncodeInt(buf, tableID) + buf = append(buf, indexPrefixSep...) + return buf +} + +type int64Slice []int64 + +func (p int64Slice) Len() int { return len(p) } +func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// SortInt64Slice sorts an int64 slice. +func SortInt64Slice(s []int64) { + sort.Sort(int64Slice(s)) +} + +func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo { + pc := &tipb.ColumnInfo{ + ColumnId: proto.Int64(c.ID), + Collation: proto.Int32(collationToProto(c.FieldType.Collate)), + ColumnLen: proto.Int32(int32(c.FieldType.Flen)), + Decimal: proto.Int32(int32(c.FieldType.Decimal)), + Elems: c.Elems, + } + t := int32(c.FieldType.Tp) + pc.Tp = &t + return pc +} + +func collationToProto(c string) int32 { + v, ok := mysql.CollationNames[c] + if ok { + return int32(v) + } + return int32(mysql.DefaultCollationID) +} + +// TableToProto converts a model.TableInfo to a tipb.TableInfo. +func TableToProto(t *model.TableInfo) *tipb.TableInfo { + pt := &tipb.TableInfo{ + TableId: proto.Int64(t.ID), + } + cols := make([]*tipb.ColumnInfo, 0, len(t.Columns)) + for _, c := range t.Columns { + col := columnToProto(c) + if t.PKIsHandle && mysql.HasPriKeyFlag(c.Flag) { + col.PkHandle = proto.Bool(true) + } else { + col.PkHandle = proto.Bool(false) + } + cols = append(cols, col) + } + pt.Columns = cols + return pt +} + +// ProtoColumnsToFieldTypes converts tipb column info slice to FieldTyps slice. +func ProtoColumnsToFieldTypes(pColumns []*tipb.ColumnInfo) []*types.FieldType { + fields := make([]*types.FieldType, len(pColumns)) + for i, v := range pColumns { + field := new(types.FieldType) + field.Tp = byte(v.GetTp()) + field.Collate = mysql.Collations[byte(v.GetCollation())] + field.Decimal = int(v.GetDecimal()) + field.Flen = int(v.GetColumnLen()) + field.Flag = uint(v.GetFlag()) + field.Elems = v.GetElems() + fields[i] = field + } + return fields +} + +// IndexToProto converts a model.IndexInfo to a tipb.IndexInfo. +func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo { + pi := &tipb.IndexInfo{ + TableId: proto.Int64(t.ID), + IndexId: proto.Int64(idx.ID), + Unique: proto.Bool(idx.Unique), + } + cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns)) + for _, c := range t.Columns { + cols = append(cols, columnToProto(c)) + } + pi.Columns = cols + return pi +} + +// EncodeTableRanges encodes table ranges into kv.KeyRanges. +func EncodeTableRanges(tid int64, rans []*tipb.KeyRange, points [][]byte) []kv.KeyRange { + keyRanges := make([]kv.KeyRange, 0, len(rans)+len(points)) + for _, r := range rans { + start := EncodeRowKey(tid, r.Low) + end := EncodeRowKey(tid, r.High) + nr := kv.KeyRange{ + StartKey: start, + EndKey: end, + } + keyRanges = append(keyRanges, nr) + } + for _, pdata := range points { + // Convert KeyPoint to kv.KeyRange + start := EncodeRowKey(tid, pdata) + nr := kv.KeyRange{ + StartKey: start, + EndKey: start.PartialNext(), + } + keyRanges = append(keyRanges, nr) + } + sortKeyRange(keyRanges) + return keyRanges +} + +// EncodeIndexRanges encodes index ranges into kv.KeyRanges. +func EncodeIndexRanges(tid int64, rans []*tipb.KeyRange, points [][]byte) []kv.KeyRange { + keyRanges := make([]kv.KeyRange, 0, len(rans)+len(points)) + for _, r := range rans { + // Convert range to kv.KeyRange + start := EncodeIndexSeekKey(tid, r.Low) + end := EncodeIndexSeekKey(tid, r.High) + nr := kv.KeyRange{ + StartKey: start, + EndKey: end, + } + keyRanges = append(keyRanges, nr) + } + for _, pdata := range points { + start := EncodeIndexSeekKey(tid, pdata) + nr := kv.KeyRange{ + StartKey: start, + EndKey: start.PartialNext(), + } + keyRanges = append(keyRanges, nr) + } + sortKeyRange(keyRanges) + return keyRanges +} + +func sortKeyRange(ranges []kv.KeyRange) { + sorter := keyRangeSorter{ranges: ranges} + sort.Sort(&sorter) +} + +type keyRangeSorter struct { + ranges []kv.KeyRange +} + +func (r *keyRangeSorter) Len() int { + return len(r.ranges) +} + +func (r *keyRangeSorter) Less(i, j int) bool { + a := r.ranges[i] + b := r.ranges[j] + cmp := bytes.Compare(a.StartKey, b.StartKey) + return cmp < 0 +} + +func (r *keyRangeSorter) Swap(i, j int) { + r.ranges[i], r.ranges[j] = r.ranges[j], r.ranges[i] +} diff --git a/xapi/tablecodec/tablecodec_test.go b/xapi/tablecodec/tablecodec_test.go new file mode 100644 index 0000000000..50be5c0133 --- /dev/null +++ b/xapi/tablecodec/tablecodec_test.go @@ -0,0 +1,30 @@ +package tablecodec + +import ( + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/codec" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&tableCodecSuite{}) + +type tableCodecSuite struct{} + +// TODO: add more tests. +func (s *tableCodecSuite) TestTableCodec(c *C) { + key := EncodeRowKey(1, codec.EncodeInt(nil, 2)) + h, err := DecodeRowKey(key) + c.Assert(err, IsNil) + c.Assert(h, Equals, int64(2)) + + key = EncodeColumnKey(1, codec.EncodeInt(nil, 2), 3) + h, err = DecodeRowKey(key) + c.Assert(err, IsNil) + c.Assert(h, Equals, int64(2)) + +} diff --git a/xapi/xapi.go b/xapi/xapi.go new file mode 100644 index 0000000000..43a0c128a3 --- /dev/null +++ b/xapi/xapi.go @@ -0,0 +1,136 @@ +package xapi + +import ( + "io" + "io/ioutil" + + "github.com/golang/protobuf/proto" + "github.com/juju/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/types" + "github.com/pingcap/tidb/xapi/tablecodec" + "github.com/pingcap/tidb/xapi/tipb" +) + +// SelectResult is used to get response rows from SelectRequest. +type SelectResult struct { + fields []*types.FieldType + resp kv.Response +} + +// Next returns the next row. +func (r *SelectResult) Next() (subResult *SubResult, err error) { + var reader io.ReadCloser + reader, err = r.resp.Next() + if err != nil { + return nil, errors.Trace(err) + } + if reader == nil { + return nil, nil + } + subResult = &SubResult{ + fields: r.fields, + reader: reader, + } + return +} + +// Close closes SelectResult. +func (r *SelectResult) Close() error { + return r.resp.Close() +} + +// SubResult represents a subset of select result. +type SubResult struct { + fields []*types.FieldType + reader io.ReadCloser + resp *tipb.SelectResponse + cursor int +} + +// Next returns the next row of the sub result. +// If no more row to return, data would be nil. +func (r *SubResult) Next() (handle int64, data []types.Datum, err error) { + if r.resp == nil { + r.resp = new(tipb.SelectResponse) + var b []byte + b, err = ioutil.ReadAll(r.reader) + r.reader.Close() + if err != nil { + return 0, nil, errors.Trace(err) + } + err = proto.Unmarshal(b, r.resp) + if err != nil { + return 0, nil, errors.Trace(err) + } + } + if r.cursor >= len(r.resp.Rows) { + return 0, nil, nil + } + row := r.resp.Rows[r.cursor] + data, err = tablecodec.DecodeValues(row.Data, r.fields) + if err != nil { + return 0, nil, errors.Trace(err) + } + handleBytes := row.GetHandle() + _, handle, err = codec.DecodeInt(handleBytes) + if err != nil { + return 0, nil, errors.Trace(err) + } + r.cursor++ + return +} + +// Close closes the sub result. +func (r *SubResult) Close() error { + return nil +} + +// Select do a select request, returns SelectResult. +func Select(client kv.Client, req *tipb.SelectRequest, concurrency int) (*SelectResult, error) { + // Convert tipb.*Request to kv.Request + kvReq, err := composeRequest(req, concurrency) + if err != nil { + return nil, errors.Trace(err) + } + resp := client.Send(kvReq) + if resp == nil { + return nil, errors.New("client returns nil response") + } + var columns []*tipb.ColumnInfo + if req.TableInfo != nil { + columns = req.TableInfo.Columns + } else { + columns = req.IndexInfo.Columns + } + fields := tablecodec.ProtoColumnsToFieldTypes(columns) + return &SelectResult{fields: fields, resp: resp}, nil +} + +// Convert tipb.Request to kv.Request. +func composeRequest(req *tipb.SelectRequest, concurrency int) (*kv.Request, error) { + kvReq := &kv.Request{ + Concurrency: concurrency, + } + if req.IndexInfo != nil { + kvReq.Tp = kv.ReqTypeIndex + tid := req.IndexInfo.GetTableId() + kvReq.KeyRanges = tablecodec.EncodeIndexRanges(tid, req.Ranges, req.Points) + } else { + kvReq.Tp = kv.ReqTypeSelect + tid := req.GetTableInfo().GetTableId() + kvReq.KeyRanges = tablecodec.EncodeTableRanges(tid, req.Ranges, req.Points) + } + var err error + kvReq.Data, err = proto.Marshal(req) + if err != nil { + return nil, errors.Trace(err) + } + return kvReq, nil +} + +// SupportExpression checks if the expression is supported by the client. +func SupportExpression(client kv.Client, expr *tipb.Expr) bool { + return false +}