Merge pull request #1002 from pingcap/coocood/xapi

xapi: add xapi functions.
This commit is contained in:
Ewan Chou
2016-03-23 17:42:53 +08:00
6 changed files with 586 additions and 40 deletions

View File

@ -26,6 +26,29 @@ func (k Key) Next() Key {
return buf
}
// PartialNext returns the next partial key.
// For example, a key composed with two columns.
// Next will return a key with the same first column value,
// but PartialNext will return a key with different first column value.
// key encoding method must ensure the next different value has the
// same length as the original value.
func (k Key) PartialNext() Key {
buf := make([]byte, len([]byte(k)))
copy(buf, []byte(k))
var i int
for i = len(k) - 1; i >= 0; i-- {
buf[i]++
if buf[i] != 0 {
break
}
}
if i == -1 {
copy(buf, k)
buf = append(buf, 0)
}
return buf
}
// Cmp returns the comparison result of two key.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k Key) Cmp(another Key) int {

View File

@ -125,6 +125,8 @@ type Response interface {
// Next returns a resultSubset from a single storage unit.
// When full result set is returned, nil is returned.
Next() (resultSubset io.ReadCloser, err error)
// Close response.
Close() error
}
// Snapshot defines the interface for the snapshot fetched from KV store.

View File

@ -108,52 +108,13 @@ func Decode(b []byte) ([]types.Datum, error) {
}
var (
flag byte
err error
values = make([]types.Datum, 0, 1)
)
for len(b) > 0 {
flag = b[0]
b = b[1:]
var d types.Datum
switch flag {
case intFlag:
var v int64
b, v, err = DecodeInt(b)
d.SetInt64(v)
case uintFlag:
var v uint64
b, v, err = DecodeUint(b)
d.SetUint64(v)
case floatFlag:
var v float64
b, v, err = DecodeFloat(b)
d.SetFloat64(v)
case bytesFlag:
var v []byte
b, v, err = DecodeBytes(b)
d.SetBytes(v)
case compactBytesFlag:
var v []byte
b, v, err = DecodeCompactBytes(b)
d.SetBytes(v)
case decimalFlag:
var v mysql.Decimal
b, v, err = DecodeDecimal(b)
d.SetValue(v)
case durationFlag:
var r int64
b, r, err = DecodeInt(b)
if err == nil {
// use max fsp, let outer to do round manually.
v := mysql.Duration{Duration: time.Duration(r), Fsp: mysql.MaxFsp}
d.SetValue(v)
}
case nilFlag:
default:
return nil, errors.Errorf("invalid encoded key flag %v", flag)
}
b, d, err = DecodeOne(b)
if err != nil {
return nil, errors.Trace(err)
}
@ -163,3 +124,53 @@ func Decode(b []byte) ([]types.Datum, error) {
return values, nil
}
// DecodeOne decodes on datum from a byte slice generated with EncodeKey or EncodeValue.
func DecodeOne(b []byte) (remain []byte, d types.Datum, err error) {
if len(b) < 1 {
return nil, d, errors.New("invalid encoded key")
}
flag := b[0]
b = b[1:]
switch flag {
case intFlag:
var v int64
b, v, err = DecodeInt(b)
d.SetInt64(v)
case uintFlag:
var v uint64
b, v, err = DecodeUint(b)
d.SetUint64(v)
case floatFlag:
var v float64
b, v, err = DecodeFloat(b)
d.SetFloat64(v)
case bytesFlag:
var v []byte
b, v, err = DecodeBytes(b)
d.SetBytes(v)
case compactBytesFlag:
var v []byte
b, v, err = DecodeCompactBytes(b)
d.SetBytes(v)
case decimalFlag:
var v mysql.Decimal
b, v, err = DecodeDecimal(b)
d.SetValue(v)
case durationFlag:
var r int64
b, r, err = DecodeInt(b)
if err == nil {
// use max fsp, let outer to do round manually.
v := mysql.Duration{Duration: time.Duration(r), Fsp: mysql.MaxFsp}
d.SetValue(v)
}
case nilFlag:
default:
return b, d, errors.Errorf("invalid encoded key flag %v", flag)
}
if err != nil {
return b, d, errors.Trace(err)
}
return b, d, nil
}

View File

@ -0,0 +1,344 @@
package tablecodec
import (
"sort"
"time"
"bytes"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tidb/xapi/tipb"
)
var (
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
indexPrefixSep = []byte("_i")
)
const (
idLen = 8
prefixLen = 1 + idLen /*tableID*/ + 2
recordRowKeyLen = prefixLen + idLen /*handle*/
)
// EncodeRowKey encodes the table id and record handle into a kv.Key
func EncodeRowKey(tableID int64, encodedHandle []byte) kv.Key {
buf := make([]byte, 0, recordRowKeyLen)
buf = appendTableRecordPrefix(buf, tableID)
buf = append(buf, encodedHandle...)
return buf
}
// EncodeColumnKey encodes the table id, row handle and columnID into a kv.Key
func EncodeColumnKey(tableID int64, encodedHandle []byte, columnID int64) kv.Key {
buf := make([]byte, 0, recordRowKeyLen+idLen)
buf = appendTableRecordPrefix(buf, tableID)
buf = append(buf, encodedHandle...)
buf = codec.EncodeInt(buf, columnID)
return buf
}
// DecodeRowKey decodes the key and gets the handle.
func DecodeRowKey(key kv.Key) (handle int64, err error) {
k := key
if !key.HasPrefix(tablePrefix) {
return 0, errors.Errorf("invalid record key - %q", k)
}
key = key[len(tablePrefix):]
// Table ID is not needed.
key, _, err = codec.DecodeInt(key)
if err != nil {
return 0, errors.Trace(err)
}
if !key.HasPrefix(recordPrefixSep) {
return 0, errors.Errorf("invalid record key - %q", k)
}
key = key[len(recordPrefixSep):]
key, handle, err = codec.DecodeInt(key)
if err != nil {
return 0, errors.Trace(err)
}
return
}
// DecodeValues decodes a byte slice into datums with column types.
func DecodeValues(data []byte, fts []*types.FieldType) ([]types.Datum, error) {
values, err := codec.Decode(data)
if err != nil {
return nil, errors.Trace(err)
}
if len(values) > len(fts) {
return nil, errors.Errorf("invalid column count %d is less than value count %d", len(fts), len(values))
}
for i := range values {
values[i], err = unflatten(values[i], fts[i])
if err != nil {
return nil, errors.Trace(err)
}
}
return values, nil
}
// unflatten converts a raw datum to a column datum.
func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) {
if datum.Kind() == types.KindNull {
return datum, nil
}
switch ft.Tp {
case mysql.TypeFloat:
datum.SetFloat32(float32(datum.GetFloat64()))
return datum, nil
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeYear, mysql.TypeInt24,
mysql.TypeLong, mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeTinyBlob,
mysql.TypeMediumBlob, mysql.TypeBlob, mysql.TypeLongBlob, mysql.TypeVarchar,
mysql.TypeString:
return datum, nil
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp:
var t mysql.Time
t.Type = ft.Tp
t.Fsp = ft.Decimal
err := t.Unmarshal(datum.GetBytes())
if err != nil {
return datum, errors.Trace(err)
}
datum.SetValue(t)
return datum, nil
case mysql.TypeDuration:
dur := mysql.Duration{Duration: time.Duration(datum.GetInt64())}
datum.SetValue(dur)
return datum, nil
case mysql.TypeNewDecimal:
dec, err := mysql.ParseDecimal(datum.GetString())
if err != nil {
return datum, errors.Trace(err)
}
datum.SetValue(dec)
return datum, nil
case mysql.TypeEnum:
enum, err := mysql.ParseEnumValue(ft.Elems, datum.GetUint64())
if err != nil {
return datum, errors.Trace(err)
}
datum.SetValue(enum)
return datum, nil
case mysql.TypeSet:
set, err := mysql.ParseSetValue(ft.Elems, datum.GetUint64())
if err != nil {
return datum, errors.Trace(err)
}
datum.SetValue(set)
return datum, nil
case mysql.TypeBit:
bit := mysql.Bit{Value: datum.GetUint64(), Width: ft.Flen}
datum.SetValue(bit)
return datum, nil
}
return datum, nil
}
// EncodeIndexSeekKey encodes an index value to kv.Key.
func EncodeIndexSeekKey(tableID int64, encodedValue []byte) kv.Key {
key := make([]byte, 0, prefixLen+len(encodedValue))
key = appendTableIndexPrefix(key, tableID)
key = append(key, encodedValue...)
return key
}
// IndexRowData extracts the row data and handle in an index key.
// If the index is unique, handle would be nil.
func IndexRowData(key kv.Key, columnCount int) (data, handle []byte, err error) {
b := key[prefixLen:]
// The index key may have primary key appended to the end, we decode the column values
// only to get the column values length.
for columnCount > 0 {
b, _, err = codec.DecodeOne(b)
if err != nil {
return nil, nil, errors.Trace(err)
}
columnCount--
}
handle = b
return key[prefixLen : len(key)-len(handle)], handle, nil
}
// Record prefix is "t[tableID]_r".
func appendTableRecordPrefix(buf []byte, tableID int64) []byte {
buf = append(buf, tablePrefix...)
buf = codec.EncodeInt(buf, tableID)
buf = append(buf, recordPrefixSep...)
return buf
}
// Index prefix is "t[tableID]_i".
func appendTableIndexPrefix(buf []byte, tableID int64) []byte {
buf = append(buf, tablePrefix...)
buf = codec.EncodeInt(buf, tableID)
buf = append(buf, indexPrefixSep...)
return buf
}
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// SortInt64Slice sorts an int64 slice.
func SortInt64Slice(s []int64) {
sort.Sort(int64Slice(s))
}
func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo {
pc := &tipb.ColumnInfo{
ColumnId: proto.Int64(c.ID),
Collation: proto.Int32(collationToProto(c.FieldType.Collate)),
ColumnLen: proto.Int32(int32(c.FieldType.Flen)),
Decimal: proto.Int32(int32(c.FieldType.Decimal)),
Elems: c.Elems,
}
t := int32(c.FieldType.Tp)
pc.Tp = &t
return pc
}
func collationToProto(c string) int32 {
v, ok := mysql.CollationNames[c]
if ok {
return int32(v)
}
return int32(mysql.DefaultCollationID)
}
// TableToProto converts a model.TableInfo to a tipb.TableInfo.
func TableToProto(t *model.TableInfo) *tipb.TableInfo {
pt := &tipb.TableInfo{
TableId: proto.Int64(t.ID),
}
cols := make([]*tipb.ColumnInfo, 0, len(t.Columns))
for _, c := range t.Columns {
col := columnToProto(c)
if t.PKIsHandle && mysql.HasPriKeyFlag(c.Flag) {
col.PkHandle = proto.Bool(true)
} else {
col.PkHandle = proto.Bool(false)
}
cols = append(cols, col)
}
pt.Columns = cols
return pt
}
// ProtoColumnsToFieldTypes converts tipb column info slice to FieldTyps slice.
func ProtoColumnsToFieldTypes(pColumns []*tipb.ColumnInfo) []*types.FieldType {
fields := make([]*types.FieldType, len(pColumns))
for i, v := range pColumns {
field := new(types.FieldType)
field.Tp = byte(v.GetTp())
field.Collate = mysql.Collations[byte(v.GetCollation())]
field.Decimal = int(v.GetDecimal())
field.Flen = int(v.GetColumnLen())
field.Flag = uint(v.GetFlag())
field.Elems = v.GetElems()
fields[i] = field
}
return fields
}
// IndexToProto converts a model.IndexInfo to a tipb.IndexInfo.
func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo {
pi := &tipb.IndexInfo{
TableId: proto.Int64(t.ID),
IndexId: proto.Int64(idx.ID),
Unique: proto.Bool(idx.Unique),
}
cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns))
for _, c := range t.Columns {
cols = append(cols, columnToProto(c))
}
pi.Columns = cols
return pi
}
// EncodeTableRanges encodes table ranges into kv.KeyRanges.
func EncodeTableRanges(tid int64, rans []*tipb.KeyRange, points [][]byte) []kv.KeyRange {
keyRanges := make([]kv.KeyRange, 0, len(rans)+len(points))
for _, r := range rans {
start := EncodeRowKey(tid, r.Low)
end := EncodeRowKey(tid, r.High)
nr := kv.KeyRange{
StartKey: start,
EndKey: end,
}
keyRanges = append(keyRanges, nr)
}
for _, pdata := range points {
// Convert KeyPoint to kv.KeyRange
start := EncodeRowKey(tid, pdata)
nr := kv.KeyRange{
StartKey: start,
EndKey: start.PartialNext(),
}
keyRanges = append(keyRanges, nr)
}
sortKeyRange(keyRanges)
return keyRanges
}
// EncodeIndexRanges encodes index ranges into kv.KeyRanges.
func EncodeIndexRanges(tid int64, rans []*tipb.KeyRange, points [][]byte) []kv.KeyRange {
keyRanges := make([]kv.KeyRange, 0, len(rans)+len(points))
for _, r := range rans {
// Convert range to kv.KeyRange
start := EncodeIndexSeekKey(tid, r.Low)
end := EncodeIndexSeekKey(tid, r.High)
nr := kv.KeyRange{
StartKey: start,
EndKey: end,
}
keyRanges = append(keyRanges, nr)
}
for _, pdata := range points {
start := EncodeIndexSeekKey(tid, pdata)
nr := kv.KeyRange{
StartKey: start,
EndKey: start.PartialNext(),
}
keyRanges = append(keyRanges, nr)
}
sortKeyRange(keyRanges)
return keyRanges
}
func sortKeyRange(ranges []kv.KeyRange) {
sorter := keyRangeSorter{ranges: ranges}
sort.Sort(&sorter)
}
type keyRangeSorter struct {
ranges []kv.KeyRange
}
func (r *keyRangeSorter) Len() int {
return len(r.ranges)
}
func (r *keyRangeSorter) Less(i, j int) bool {
a := r.ranges[i]
b := r.ranges[j]
cmp := bytes.Compare(a.StartKey, b.StartKey)
return cmp < 0
}
func (r *keyRangeSorter) Swap(i, j int) {
r.ranges[i], r.ranges[j] = r.ranges[j], r.ranges[i]
}

View File

@ -0,0 +1,30 @@
package tablecodec
import (
"testing"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/codec"
)
func TestT(t *testing.T) {
TestingT(t)
}
var _ = Suite(&tableCodecSuite{})
type tableCodecSuite struct{}
// TODO: add more tests.
func (s *tableCodecSuite) TestTableCodec(c *C) {
key := EncodeRowKey(1, codec.EncodeInt(nil, 2))
h, err := DecodeRowKey(key)
c.Assert(err, IsNil)
c.Assert(h, Equals, int64(2))
key = EncodeColumnKey(1, codec.EncodeInt(nil, 2), 3)
h, err = DecodeRowKey(key)
c.Assert(err, IsNil)
c.Assert(h, Equals, int64(2))
}

136
xapi/xapi.go Normal file
View File

@ -0,0 +1,136 @@
package xapi
import (
"io"
"io/ioutil"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tidb/xapi/tablecodec"
"github.com/pingcap/tidb/xapi/tipb"
)
// SelectResult is used to get response rows from SelectRequest.
type SelectResult struct {
fields []*types.FieldType
resp kv.Response
}
// Next returns the next row.
func (r *SelectResult) Next() (subResult *SubResult, err error) {
var reader io.ReadCloser
reader, err = r.resp.Next()
if err != nil {
return nil, errors.Trace(err)
}
if reader == nil {
return nil, nil
}
subResult = &SubResult{
fields: r.fields,
reader: reader,
}
return
}
// Close closes SelectResult.
func (r *SelectResult) Close() error {
return r.resp.Close()
}
// SubResult represents a subset of select result.
type SubResult struct {
fields []*types.FieldType
reader io.ReadCloser
resp *tipb.SelectResponse
cursor int
}
// Next returns the next row of the sub result.
// If no more row to return, data would be nil.
func (r *SubResult) Next() (handle int64, data []types.Datum, err error) {
if r.resp == nil {
r.resp = new(tipb.SelectResponse)
var b []byte
b, err = ioutil.ReadAll(r.reader)
r.reader.Close()
if err != nil {
return 0, nil, errors.Trace(err)
}
err = proto.Unmarshal(b, r.resp)
if err != nil {
return 0, nil, errors.Trace(err)
}
}
if r.cursor >= len(r.resp.Rows) {
return 0, nil, nil
}
row := r.resp.Rows[r.cursor]
data, err = tablecodec.DecodeValues(row.Data, r.fields)
if err != nil {
return 0, nil, errors.Trace(err)
}
handleBytes := row.GetHandle()
_, handle, err = codec.DecodeInt(handleBytes)
if err != nil {
return 0, nil, errors.Trace(err)
}
r.cursor++
return
}
// Close closes the sub result.
func (r *SubResult) Close() error {
return nil
}
// Select do a select request, returns SelectResult.
func Select(client kv.Client, req *tipb.SelectRequest, concurrency int) (*SelectResult, error) {
// Convert tipb.*Request to kv.Request
kvReq, err := composeRequest(req, concurrency)
if err != nil {
return nil, errors.Trace(err)
}
resp := client.Send(kvReq)
if resp == nil {
return nil, errors.New("client returns nil response")
}
var columns []*tipb.ColumnInfo
if req.TableInfo != nil {
columns = req.TableInfo.Columns
} else {
columns = req.IndexInfo.Columns
}
fields := tablecodec.ProtoColumnsToFieldTypes(columns)
return &SelectResult{fields: fields, resp: resp}, nil
}
// Convert tipb.Request to kv.Request.
func composeRequest(req *tipb.SelectRequest, concurrency int) (*kv.Request, error) {
kvReq := &kv.Request{
Concurrency: concurrency,
}
if req.IndexInfo != nil {
kvReq.Tp = kv.ReqTypeIndex
tid := req.IndexInfo.GetTableId()
kvReq.KeyRanges = tablecodec.EncodeIndexRanges(tid, req.Ranges, req.Points)
} else {
kvReq.Tp = kv.ReqTypeSelect
tid := req.GetTableInfo().GetTableId()
kvReq.KeyRanges = tablecodec.EncodeTableRanges(tid, req.Ranges, req.Points)
}
var err error
kvReq.Data, err = proto.Marshal(req)
if err != nil {
return nil, errors.Trace(err)
}
return kvReq, nil
}
// SupportExpression checks if the expression is supported by the client.
func SupportExpression(client kv.Client, expr *tipb.Expr) bool {
return false
}