Merge pull request #1020 from pingcap/coocood/local-x

store/localstore: add local store xapi support.
This commit is contained in:
Ewan Chou
2016-03-29 21:18:16 +08:00
16 changed files with 727 additions and 125 deletions

View File

@ -93,6 +93,7 @@ func resultRowToRow(t table.Table, h int64, data []types.Datum) *Row {
}
func (e *XSelectTableExec) doRequest() error {
// TODO: add offset and limit.
txn, err := e.ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
@ -138,6 +139,9 @@ func (e *XSelectIndexExec) Next() (*Row, error) {
return nil, nil
}
row := e.rows[e.cursor]
for i, field := range e.indexPlan.Fields() {
field.Expr.SetDatum(row.Data[i])
}
e.cursor++
return row, nil
}
@ -160,33 +164,53 @@ func (e *XSelectIndexExec) doRequest() error {
if err != nil {
return errors.Trace(err)
}
indexOrder := make(map[int64]int)
for i, h := range handles {
indexOrder[h] = i
if len(handles) == 0 {
return nil
}
var indexOrder map[int64]int
if !e.indexPlan.OutOfOrder {
// Save the index order.
indexOrder = make(map[int64]int)
for i, h := range handles {
indexOrder[h] = i
}
}
sort.Sort(int64Slice(handles))
tblResult, err := e.doTableRequest(txn, handles)
unorderedRows, err := extractRowsFromTableResult(e.table, tblResult)
rows, err := extractRowsFromTableResult(e.table, tblResult)
if err != nil {
return errors.Trace(err)
}
// Restore the original index order.
rows := make([]*Row, len(handles))
for i, h := range handles {
oi := indexOrder[h]
rows[oi] = unorderedRows[i]
if len(rows) < len(handles) {
return errors.Errorf("got %d rows with %d handles", len(rows), len(handles))
}
if !e.indexPlan.OutOfOrder {
// Restore the index order.
orderedRows := make([]*Row, len(handles))
for i, h := range handles {
oi := indexOrder[h]
orderedRows[oi] = rows[i]
}
rows = orderedRows
}
e.rows = rows
return nil
}
func (e *XSelectIndexExec) doIndexRequest(txn kv.Transaction) (*xapi.SelectResult, error) {
// TODO: add offset and limit.
selIdxReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selIdxReq.StartTs = &startTs
selIdxReq.IndexInfo = tablecodec.IndexToProto(e.table.Meta(), e.indexPlan.Index)
fieldTypes := make([]*types.FieldType, len(e.indexPlan.Index.Columns))
for i, v := range e.indexPlan.Index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
var err error
selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges)
selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
@ -194,6 +218,7 @@ func (e *XSelectIndexExec) doIndexRequest(txn kv.Transaction) (*xapi.SelectResul
}
func (e *XSelectIndexExec) doTableRequest(txn kv.Transaction, handles []int64) (*xapi.SelectResult, error) {
// TODO: add offset and limit.
selTableReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selTableReq.StartTs = &startTs
@ -206,7 +231,7 @@ func (e *XSelectIndexExec) doTableRequest(txn kv.Transaction, handles []int64) (
}
pbRange := new(tipb.KeyRange)
pbRange.Low = codec.EncodeInt(nil, h)
pbRange.High = codec.EncodeInt(nil, h)
pbRange.High = kv.Key(pbRange.Low).PrefixNext()
selTableReq.Ranges = append(selTableReq.Ranges, pbRange)
}
selTableReq.Where = conditionsToPBExpression(e.indexPlan.FilterConditions...)
@ -236,28 +261,92 @@ func tableRangesToPBRanges(tableRanges []plan.TableRange) []*tipb.KeyRange {
return hrs
}
func indexRangesToPBRanges(ranges []*plan.IndexRange) ([]*tipb.KeyRange, error) {
func indexRangesToPBRanges(ranges []*plan.IndexRange, fieldTypes []*types.FieldType) ([]*tipb.KeyRange, error) {
keyRanges := make([]*tipb.KeyRange, 0, len(ranges))
for _, ran := range ranges {
low, err := codec.EncodeValue(nil, ran.LowVal...)
err := convertIndexRangeTypes(ran, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
low, err := codec.EncodeKey(nil, ran.LowVal...)
if err != nil {
return nil, errors.Trace(err)
}
if ran.LowExclude {
low = []byte(kv.Key(low).PartialNext())
low = []byte(kv.Key(low).PrefixNext())
}
high, err := codec.EncodeValue(nil, ran.HighVal...)
high, err := codec.EncodeKey(nil, ran.HighVal...)
if err != nil {
return nil, errors.Trace(err)
}
if !ran.HighExclude {
high = []byte(kv.Key(high).PartialNext())
high = []byte(kv.Key(high).PrefixNext())
}
keyRanges = append(keyRanges, &tipb.KeyRange{Low: low, High: high})
}
return keyRanges, nil
}
func convertIndexRangeTypes(ran *plan.IndexRange, fieldTypes []*types.FieldType) error {
for i := range ran.LowVal {
if ran.LowVal[i].Kind() == types.KindMinNotNull {
ran.LowVal[i].SetBytes([]byte{})
continue
}
converted, err := ran.LowVal[i].ConvertTo(fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
cmp, err := converted.CompareDatum(ran.LowVal[i])
if err != nil {
return errors.Trace(err)
}
ran.LowVal[i] = converted
if cmp == 0 {
continue
}
if cmp < 0 && !ran.LowExclude {
// For int column a, a >= 1.1 is converted to a > 1.
ran.LowExclude = true
} else if cmp > 0 && ran.LowExclude {
// For int column a, a > 1.9 is converted to a >= 2.
ran.LowExclude = false
}
// The converted value has changed, the other column values doesn't matter.
// For equal condition, converted value changed means there will be no match.
// For non equal condition, this column would be the last one to build the range.
// Break here to prevent the rest columns modify LowExclude again.
break
}
for i := range ran.HighVal {
if ran.HighVal[i].Kind() == types.KindMaxValue {
continue
}
converted, err := ran.HighVal[i].ConvertTo(fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
cmp, err := converted.CompareDatum(ran.HighVal[i])
if err != nil {
return errors.Trace(err)
}
ran.HighVal[i] = converted
if cmp == 0 {
continue
}
// For int column a, a < 1.1 is converted to a <= 1.
if cmp < 0 && ran.HighExclude {
ran.HighExclude = false
}
// For int column a, a <= 1.9 is converted to a < 2.
if cmp > 0 && !ran.HighExclude {
ran.HighExclude = true
}
break
}
return nil
}
func extractHandlesFromIndexResult(idxResult *xapi.SelectResult) ([]int64, error) {
var handles []int64
for {

View File

@ -26,13 +26,18 @@ func (k Key) Next() Key {
return buf
}
// PartialNext returns the next partial key.
// For example, a key composed with two columns.
// Next will return a key with the same first column value,
// but PartialNext will return a key with different first column value.
// key encoding method must ensure the next different value has the
// same length as the original value.
func (k Key) PartialNext() Key {
// PrefixNext returns the next prefix key.
//
// Assume there are keys like:
//
// rowkey1
// rowkey1_column1
// rowkey1_column2
// rowKey2
//
// If we seek 'rowkey1' Next, we will get 'rowkey1_colum1'.
// If we seek 'rowkey1' PrefixNext, we will get 'rowkey2'.
func (k Key) PrefixNext() Key {
buf := make([]byte, len([]byte(k)))
copy(buf, []byte(k))
var i int

View File

@ -40,7 +40,7 @@ func (s *testKeySuite) TestPartialNext(c *C) {
c.Assert(cmp, Equals, -1)
// Use next partial key, we can skip all index keys with first column value equal to "abc".
nextPartialKey := Key(seekKey).PartialNext()
nextPartialKey := Key(seekKey).PrefixNext()
cmp = bytes.Compare(nextPartialKey, keyA)
c.Assert(cmp, Equals, 1)

View File

@ -13,7 +13,10 @@
package kv
import "io"
import (
"bytes"
"io"
)
const (
// PresumeKeyNotExists directives that when dealing with a Get operation but failing to read data from cache,
@ -97,6 +100,8 @@ type Client interface {
const (
ReqTypeSelect = 101
ReqTypeIndex = 102
ReqSubTypeBasic = 0
)
// KeyRange represents a range where StartKey <= key < EndKey.
@ -105,6 +110,11 @@ type KeyRange struct {
EndKey Key
}
// IsPoint checks if the key range represents a point.
func (r *KeyRange) IsPoint() bool {
return bytes.Equal(r.StartKey.PrefixNext(), r.EndKey)
}
// Request represents a kv request.
type Request struct {
// The request type.

View File

@ -602,7 +602,7 @@ func (b *planBuilder) buildJoin(sel *ast.SelectStmt) Plan {
if !path.attachCondition(whereCond, nil) {
// TODO: Find a better way to handle this condition.
path.conditions = append(path.conditions, whereCond)
log.Errorf("Failed to attach where condtion.")
log.Warnf("Failed to attach where condtion in %s", sel.Text())
}
}
path.extractEqualConditon()

View File

@ -135,6 +135,9 @@ type IndexScan struct {
// FilterConditions can be used to filter result.
FilterConditions []ast.ExprNode
// OutOfOrder indicates if the index scan can return out of order.
OutOfOrder bool
}
// Accept implements Plan Accept interface.

View File

@ -283,6 +283,8 @@ type dbStore struct {
mu sync.Mutex
closed bool
pd localPD
}
type storeCache struct {
@ -354,8 +356,14 @@ func (d Driver) Open(path string) (kv.Storage, error) {
s.recentUpdates, err = segmentmap.NewSegmentMap(100)
if err != nil {
return nil, errors.Trace(err)
}
regionServers := buildLocalRegionServers(s)
var infos []*regionInfo
for _, rs := range regionServers {
ri := &regionInfo{startKey: rs.startKey, endKey: rs.endKey, rs: rs}
infos = append(infos, ri)
}
s.pd.SetRegionInfo(infos)
mc.cache[engineSchema] = s
s.compactor.Start()
s.wg.Add(1)

View File

@ -0,0 +1,185 @@
package localstore
import (
"io"
"github.com/pingcap/tidb/kv"
)
type dbClient struct {
store *dbStore
regionInfo []*regionInfo
}
func (c *dbClient) Send(req *kv.Request) kv.Response {
it := &response{
client: c,
concurrency: req.Concurrency,
}
it.tasks = buildRegionTasks(c, req)
if len(it.tasks) == 0 {
// Empty range doesn't produce any task.
it.finished = true
return it
}
if it.concurrency > len(it.tasks) {
it.concurrency = len(it.tasks)
} else if it.concurrency <= 0 {
it.concurrency = 1
}
it.taskChan = make(chan *task, it.concurrency)
it.errChan = make(chan error, it.concurrency)
it.respChan = make(chan *regionResponse, it.concurrency)
it.run()
return it
}
func (c *dbClient) SupportRequestType(reqType, subType int64) bool {
switch reqType {
case kv.ReqTypeSelect:
return subType == kv.ReqSubTypeBasic
case kv.ReqTypeIndex:
return subType == kv.ReqSubTypeBasic
}
return false
}
func (c *dbClient) updateRegionInfo() {
c.regionInfo = c.store.pd.GetRegionInfo()
}
type localResponseReader struct {
s []byte
i int64
}
func (r *localResponseReader) Read(b []byte) (n int, err error) {
if len(b) == 0 {
return 0, nil
}
if r.i >= int64(len(r.s)) {
return 0, io.EOF
}
n = copy(b, r.s[r.i:])
r.i += int64(n)
return
}
func (r *localResponseReader) Close() error {
r.i = int64(len(r.s))
return nil
}
type response struct {
client *dbClient
reqSent int
respGot int
concurrency int
tasks []*task
responses []*regionResponse
taskChan chan *task
respChan chan *regionResponse
errChan chan error
finished bool
}
type task struct {
request *regionRequest
region *localRegion
}
func (it *response) Next() (resp io.ReadCloser, err error) {
if it.finished {
return nil, nil
}
var regionResp *regionResponse
select {
case regionResp = <-it.respChan:
case err = <-it.errChan:
}
if err != nil {
it.Close()
return nil, err
}
if len(regionResp.newStartKey) != 0 {
it.client.updateRegionInfo()
retryTasks := it.createRetryTasks(regionResp)
it.tasks = append(it.tasks, retryTasks...)
}
if it.reqSent < len(it.tasks) {
it.taskChan <- it.tasks[it.reqSent]
it.reqSent++
}
it.respGot++
if it.reqSent == len(it.tasks) && it.respGot == it.reqSent {
it.Close()
}
return &localResponseReader{s: regionResp.data}, nil
}
func (it *response) createRetryTasks(resp *regionResponse) []*task {
return nil
}
func buildRegionTasks(client *dbClient, req *kv.Request) (tasks []*task) {
infoCursor := 0
rangeCursor := 0
var regionReq *regionRequest
infos := client.regionInfo
for rangeCursor < len(req.KeyRanges) && infoCursor < len(infos) {
info := infos[infoCursor]
ran := req.KeyRanges[rangeCursor]
rangeOnLeft := ran.EndKey.Cmp(info.startKey) <= 0
rangeOnRight := info.endKey.Cmp(ran.StartKey) <= 0
noDataOnRegion := rangeOnLeft || rangeOnRight
if noDataOnRegion {
if rangeOnLeft {
rangeCursor++
} else {
infoCursor++
}
} else {
regionReq = &regionRequest{
Tp: req.Tp,
startKey: info.startKey,
endKey: info.endKey,
data: req.Data,
}
task := &task{
region: info.rs,
request: regionReq,
}
tasks = append(tasks, task)
infoCursor++
}
}
return
}
func (it *response) Close() error {
// Make goroutines quit.
if it.finished {
return nil
}
close(it.taskChan)
it.finished = true
return nil
}
func (it *response) run() {
for i := 0; i < it.concurrency; i++ {
go func() {
for task := range it.taskChan {
resp, err := task.region.Handle(task.request)
if err != nil {
it.errChan <- err
break
}
it.respChan <- resp
}
}()
it.taskChan <- it.tasks[i]
it.reqSent++
}
}

View File

@ -0,0 +1,39 @@
package localstore
import "github.com/pingcap/tidb/kv"
type localPD struct {
regions []*regionInfo
}
type regionInfo struct {
startKey kv.Key
endKey kv.Key
rs *localRegion
}
func (pd *localPD) GetRegionInfo() []*regionInfo {
return pd.regions
}
func (pd *localPD) SetRegionInfo(regions []*regionInfo) {
pd.regions = regions
}
// ChangeRegionInfo used for test handling region info change.
func ChangeRegionInfo(store kv.Storage, regionID int, startKey, endKey []byte) {
s := store.(*dbStore)
for i, region := range s.pd.regions {
if region.rs.id == regionID {
newRegionInfo := &regionInfo{
startKey: startKey,
endKey: endKey,
rs: region.rs,
}
region.rs.startKey = startKey
region.rs.endKey = endKey
s.pd.regions[i] = newRegionInfo
break
}
}
}

View File

@ -0,0 +1,299 @@
package localstore
import (
"bytes"
"encoding/binary"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tidb/xapi/tablecodec"
"github.com/pingcap/tidb/xapi/tipb"
)
// local region server.
type localRegion struct {
id int
store *dbStore
startKey []byte
endKey []byte
}
type regionRequest struct {
Tp int64
data []byte
startKey []byte
endKey []byte
}
type regionResponse struct {
req *regionRequest
err error
data []byte
// If region missed some request key range, newStartKey and newEndKey is returned.
newStartKey []byte
newEndKey []byte
}
func (rs *localRegion) Handle(req *regionRequest) (*regionResponse, error) {
resp := &regionResponse{
req: req,
}
if req.Tp == kv.ReqTypeSelect || req.Tp == kv.ReqTypeIndex {
sel := new(tipb.SelectRequest)
err := proto.Unmarshal(req.data, sel)
if err != nil {
return nil, errors.Trace(err)
}
txn := newTxn(rs.store, kv.Version{Ver: uint64(*sel.StartTs)})
var rows []*tipb.Row
if req.Tp == kv.ReqTypeSelect {
rows, err = rs.getRowsFromSelectReq(txn, sel)
} else {
rows, err = rs.getRowsFromIndexReq(txn, sel)
}
selResp := new(tipb.SelectResponse)
selResp.Error = toPBError(err)
selResp.Rows = rows
resp.err = err
data, err := proto.Marshal(selResp)
if err != nil {
return nil, errors.Trace(err)
}
resp.data = data
}
if bytes.Compare(rs.startKey, req.startKey) < 0 || bytes.Compare(rs.endKey, req.endKey) > 0 {
resp.newStartKey = rs.startKey
resp.newEndKey = rs.endKey
}
return resp, nil
}
func (rs *localRegion) getRowsFromSelectReq(txn kv.Transaction, sel *tipb.SelectRequest) ([]*tipb.Row, error) {
tid := sel.TableInfo.GetTableId()
kvRanges := rs.extractKVRanges(tid, 0, sel.Ranges)
var handles []int64
for _, ran := range kvRanges {
ranHandles, err := seekRangeHandles(tid, txn, ran)
if err != nil {
return nil, errors.Trace(err)
}
handles = append(handles, ranHandles...)
}
var rows []*tipb.Row
for _, handle := range handles {
row, err := rs.getRowByHandle(txn, tid, handle, sel.TableInfo.Columns)
if err != nil {
return nil, errors.Trace(err)
}
rows = append(rows, row)
}
return rows, nil
}
func (rs *localRegion) extractKVRanges(tid int64, idxID int64, krans []*tipb.KeyRange) []kv.KeyRange {
var kvRanges []kv.KeyRange
for _, kran := range krans {
var upperKey, lowerKey kv.Key
if idxID == 0 {
upperKey = tablecodec.EncodeRowKey(tid, kran.GetHigh())
if bytes.Compare(upperKey, rs.startKey) <= 0 {
continue
}
lowerKey = tablecodec.EncodeRowKey(tid, kran.GetLow())
} else {
upperKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetHigh())
if bytes.Compare(upperKey, rs.startKey) <= 0 {
continue
}
lowerKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetLow())
}
if bytes.Compare(lowerKey, rs.endKey) >= 0 {
break
}
var kvr kv.KeyRange
if bytes.Compare(lowerKey, rs.startKey) <= 0 {
kvr.StartKey = rs.startKey
} else {
kvr.StartKey = lowerKey
}
if bytes.Compare(upperKey, rs.endKey) <= 0 {
kvr.EndKey = upperKey
} else {
kvr.EndKey = rs.endKey
}
kvRanges = append(kvRanges, kvr)
}
return kvRanges
}
func (rs *localRegion) getRowByHandle(txn kv.Transaction, tid, handle int64, columns []*tipb.ColumnInfo) (*tipb.Row, error) {
row := new(tipb.Row)
var d types.Datum
d.SetInt64(handle)
var err error
row.Handle, err = codec.EncodeValue(nil, d)
if err != nil {
return nil, errors.Trace(err)
}
for _, col := range columns {
if *col.PkHandle {
row.Data = append(row.Data, row.Handle...)
} else {
key := tablecodec.EncodeColumnKey(tid, handle, col.GetColumnId())
data, err := txn.Get(key)
if err != nil {
return nil, errors.Trace(err)
}
row.Data = append(row.Data, data...)
}
}
return row, nil
}
func toPBError(err error) *tipb.Error {
if err == nil {
return nil
}
perr := new(tipb.Error)
code := int32(1)
perr.Code = &code
errStr := err.Error()
perr.Msg = &errStr
return perr
}
func seekRangeHandles(tid int64, txn kv.Transaction, ran kv.KeyRange) ([]int64, error) {
if ran.IsPoint() {
_, err := txn.Get(ran.StartKey)
if terror.ErrorEqual(err, kv.ErrNotExist) {
return nil, nil
} else if err != nil {
return nil, errors.Trace(err)
}
h, err := tablecodec.DecodeRowKey(ran.StartKey)
if err != nil {
return nil, errors.Trace(err)
}
return []int64{h}, nil
}
seekKey := ran.StartKey
var handles []int64
for {
it, err := txn.Seek(seekKey)
if err != nil {
return nil, errors.Trace(err)
}
if !it.Valid() || it.Key().Cmp(ran.EndKey) >= 0 {
break
}
h, err := tablecodec.DecodeRowKey(it.Key())
if err != nil {
return nil, errors.Trace(err)
}
handles = append(handles, h)
seekKey = it.Key().PrefixNext()
}
return handles, nil
}
func (rs *localRegion) getRowsFromIndexReq(txn kv.Transaction, sel *tipb.SelectRequest) ([]*tipb.Row, error) {
tid := sel.IndexInfo.GetTableId()
idxID := sel.IndexInfo.GetIndexId()
kvRanges := rs.extractKVRanges(tid, idxID, sel.Ranges)
var rows []*tipb.Row
for _, ran := range kvRanges {
ranRows, err := getIndexRowFromRange(sel.IndexInfo, txn, ran)
if err != nil {
return nil, errors.Trace(err)
}
rows = append(rows, ranRows...)
}
return rows, nil
}
func getIndexRowFromRange(idxInfo *tipb.IndexInfo, txn kv.Transaction, ran kv.KeyRange) ([]*tipb.Row, error) {
var rows []*tipb.Row
seekKey := ran.StartKey
for {
it, err := txn.Seek(seekKey)
// We have to update the seekKey here, because decoding may change the it.Key(), which should not be allowed.
// TODO: make sure decoding don't modify the original data.
seekKey = it.Key().PrefixNext()
if err != nil {
return nil, errors.Trace(err)
}
if !it.Valid() || it.Key().Cmp(ran.EndKey) >= 0 {
break
}
datums, err := tablecodec.DecodeIndexKey(it.Key())
if err != nil {
return nil, errors.Trace(err)
}
var handle types.Datum
if len(datums) > len(idxInfo.Columns) {
handle = datums[len(idxInfo.Columns)]
datums = datums[:len(idxInfo.Columns)]
} else {
var intHandle int64
intHandle, err = decodeHandle(it.Value())
if err != nil {
return nil, errors.Trace(err)
}
handle.SetInt64(intHandle)
}
data, err := codec.EncodeValue(nil, datums...)
if err != nil {
return nil, errors.Trace(err)
}
handleData, err := codec.EncodeValue(nil, handle)
if err != nil {
return nil, errors.Trace(err)
}
row := &tipb.Row{Handle: handleData, Data: data}
rows = append(rows, row)
}
return rows, nil
}
func datumStrings(datums ...types.Datum) []string {
var strs []string
for _, d := range datums {
s, _ := d.ToString()
strs = append(strs, s)
}
return strs
}
func decodeHandle(data []byte) (int64, error) {
var h int64
buf := bytes.NewBuffer(data)
err := binary.Read(buf, binary.BigEndian, &h)
return h, errors.Trace(err)
}
func buildLocalRegionServers(store *dbStore) []*localRegion {
return []*localRegion{
{
id: 1,
store: store,
startKey: []byte(""),
endKey: []byte("t"),
},
{
id: 2,
store: store,
startKey: []byte("t"),
endKey: []byte("u"),
},
{
id: 3,
store: store,
startKey: []byte("u"),
endKey: []byte("z"),
},
}
}

View File

@ -52,12 +52,12 @@ func newTxn(s *dbStore, ver kv.Version) *dbTxn {
// Implement transaction interface
func (txn *dbTxn) Get(k kv.Key) ([]byte, error) {
log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] get key:% x, txn:%d", k, txn.tid)
return txn.us.Get(k)
}
func (txn *dbTxn) Set(k kv.Key, data []byte) error {
log.Debugf("[kv] set key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] set key:% x, txn:%d", k, txn.tid)
txn.dirty = true
return txn.us.Set(k, data)
}
@ -67,12 +67,12 @@ func (txn *dbTxn) String() string {
}
func (txn *dbTxn) Seek(k kv.Key) (kv.Iterator, error) {
log.Debugf("[kv] seek key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] seek key:% x, txn:%d", k, txn.tid)
return txn.us.Seek(k)
}
func (txn *dbTxn) Delete(k kv.Key) error {
log.Debugf("[kv] delete key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] delete key:% x, txn:%d", k, txn.tid)
txn.dirty = true
return txn.us.Delete(k)
}
@ -145,16 +145,5 @@ func (txn *dbTxn) StartTS() int64 {
}
func (txn *dbTxn) GetClient() kv.Client {
return &dbClient{}
}
type dbClient struct {
}
func (c *dbClient) SupportRequestType(reqType, subType int64) bool {
return false
}
func (c *dbClient) Send(req *kv.Request) kv.Response {
return nil
return &dbClient{store: txn.store, regionInfo: txn.store.pd.GetRegionInfo()}
}

View File

@ -596,6 +596,7 @@ func (s *testCodecSuite) TestDecimal(c *C) {
{"1234", "1234.0000", 0},
{"1234", "12.34", 1},
{"12.34", "12.35", -1},
{"0.12", "0.1234", -1},
{"0.1234", "12.3400", -1},
{"0.1234", "0.1235", -1},
{"0.123400", "12.34", -1},
@ -609,6 +610,8 @@ func (s *testCodecSuite) TestDecimal(c *C) {
{"-0.0001", "0", -1},
{"-0.1234", "0", -1},
{"-0.1234", "-0.12", -1},
{"-0.12", "-0.1234", 1},
{"-0.12", "-0.1200", 0},
{"-0.1234", "0.1234", -1},
{"-1.234", "-12.34", 1},
{"-0.1234", "-12.34", 1},
@ -661,4 +664,16 @@ func (s *testCodecSuite) TestDecimal(c *C) {
ret := bytes.Compare(b1, b2)
c.Assert(ret, Equals, t.Ret)
}
floats := []float64{-123.45, -123.40, -23.45, -1.43, -0.93, -0.4333, -0.068,
-0.0099, 0, 0.001, 0.0012, 0.12, 1.2, 1.23, 123.3, 2424.242424}
var decs [][]byte
for i := range floats {
dec := mysql.NewDecimalFromFloat(floats[i])
decs = append(decs, EncodeDecimal(nil, dec))
}
for i := 0; i < len(decs)-1; i++ {
cmp := bytes.Compare(decs[i], decs[i+1])
c.Assert(cmp, LessEqual, 0)
}
}

View File

@ -35,35 +35,10 @@ func codecSign(value int64) int64 {
return positiveSign
}
func encodeExp(expValue int64, expSign int64, valSign int64) int64 {
if expSign == negativeSign {
expValue = -expValue
}
if expSign != valSign {
expValue = ^expValue
}
return expValue
}
func decodeExp(expValue int64, expSign int64, valSign int64) int64 {
if expSign != valSign {
expValue = ^expValue
}
if expSign == negativeSign {
expValue = -expValue
}
return expValue
}
// EncodeDecimal encodes a decimal d into a byte slice which can be sorted lexicographically later.
// EncodeDecimal guarantees that the encoded value is in ascending order for comparison.
// Decimal encoding:
// Byte -> value sign
// Byte -> exp sign
// EncodeInt -> exp value
// EncodeBytes -> abs value bytes
func EncodeDecimal(b []byte, d mysql.Decimal) []byte {
@ -97,13 +72,11 @@ func EncodeDecimal(b []byte, d mysql.Decimal) []byte {
}
expVal := exp + int64(d.Exponent())
expSign := codecSign(expVal)
// For negtive exp, do bit reverse for exp.
expVal = encodeExp(expVal, expSign, valSign)
if valSign == negativeSign {
expVal = -expVal
}
b = append(b, byte(valSign))
b = append(b, byte(expSign))
b = EncodeInt(b, expVal)
if valSign == negativeSign {
b = EncodeBytesDesc(b, value)
@ -135,21 +108,17 @@ func DecodeDecimal(b []byte) ([]byte, mysql.Decimal, error) {
return r, d, errors.Trace(err)
}
// Decode exp sign.
expSign := int64(r[0])
r = r[1:]
// Decode exp value.
expVal := int64(0)
r, expVal, err = DecodeInt(r)
if err != nil {
return r, d, errors.Trace(err)
}
expVal = decodeExp(expVal, expSign, valSign)
// Decode abs value bytes.
value := []byte{}
if valSign == negativeSign {
expVal = -expVal
r, value, err = DecodeBytesDesc(r)
} else {
r, value, err = DecodeBytes(r)

View File

@ -14,10 +14,9 @@
package tablecodec
import (
"sort"
"bytes"
"time"
"bytes"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
@ -49,10 +48,10 @@ func EncodeRowKey(tableID int64, encodedHandle []byte) kv.Key {
}
// EncodeColumnKey encodes the table id, row handle and columnID into a kv.Key
func EncodeColumnKey(tableID int64, encodedHandle []byte, columnID int64) kv.Key {
func EncodeColumnKey(tableID int64, handle int64, columnID int64) kv.Key {
buf := make([]byte, 0, recordRowKeyLen+idLen)
buf = appendTableRecordPrefix(buf, tableID)
buf = append(buf, encodedHandle...)
buf = codec.EncodeInt(buf, handle)
buf = codec.EncodeInt(buf, columnID)
return buf
}
@ -85,7 +84,7 @@ func DecodeRowKey(key kv.Key) (handle int64, err error) {
}
// DecodeValues decodes a byte slice into datums with column types.
func DecodeValues(data []byte, fts []*types.FieldType) ([]types.Datum, error) {
func DecodeValues(data []byte, fts []*types.FieldType, inIndex bool) ([]types.Datum, error) {
values, err := codec.Decode(data)
if err != nil {
return nil, errors.Trace(err)
@ -93,6 +92,11 @@ func DecodeValues(data []byte, fts []*types.FieldType) ([]types.Datum, error) {
if len(values) > len(fts) {
return nil, errors.Errorf("invalid column count %d is less than value count %d", len(fts), len(values))
}
if inIndex {
// We don't need to unflatten index columns for now.
return values, nil
}
for i := range values {
values[i], err = unflatten(values[i], fts[i])
if err != nil {
@ -124,7 +128,7 @@ func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) {
if err != nil {
return datum, errors.Trace(err)
}
datum.SetValue(t)
datum.SetMysqlTime(&t)
return datum, nil
case mysql.TypeDuration:
dur := mysql.Duration{Duration: time.Duration(datum.GetInt64())}
@ -160,28 +164,18 @@ func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) {
}
// EncodeIndexSeekKey encodes an index value to kv.Key.
func EncodeIndexSeekKey(tableID int64, encodedValue []byte) kv.Key {
func EncodeIndexSeekKey(tableID int64, idxID int64, encodedValue []byte) kv.Key {
key := make([]byte, 0, prefixLen+len(encodedValue))
key = appendTableIndexPrefix(key, tableID)
key = codec.EncodeInt(key, idxID)
key = append(key, encodedValue...)
return key
}
// IndexRowData extracts the row data and handle in an index key.
// If the index is unique, handle would be nil.
func IndexRowData(key kv.Key, columnCount int) (data, handle []byte, err error) {
b := key[prefixLen:]
// The index key may have primary key appended to the end, we decode the column values
// only to get the column values length.
for columnCount > 0 {
b, _, err = codec.DecodeOne(b)
if err != nil {
return nil, nil, errors.Trace(err)
}
columnCount--
}
handle = b
return key[prefixLen : len(key)-len(handle)], handle, nil
// DecodeIndexKey decodes datums from an index key.
func DecodeIndexKey(key kv.Key) ([]types.Datum, error) {
b := key[prefixLen+idLen:]
return codec.Decode(b)
}
// Record prefix is "t[tableID]_r".
@ -200,17 +194,6 @@ func appendTableIndexPrefix(buf []byte, tableID int64) []byte {
return buf
}
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// SortInt64Slice sorts an int64 slice.
func SortInt64Slice(s []int64) {
sort.Sort(int64Slice(s))
}
func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo {
pc := &tipb.ColumnInfo{
ColumnId: proto.Int64(c.ID),
@ -275,8 +258,8 @@ func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo {
Unique: proto.Bool(idx.Unique),
}
cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns))
for _, c := range t.Columns {
cols = append(cols, columnToProto(c))
for _, c := range idx.Columns {
cols = append(cols, columnToProto(t.Columns[c.Offset]))
}
pi.Columns = cols
return pi
@ -298,12 +281,12 @@ func EncodeTableRanges(tid int64, rans []*tipb.KeyRange) []kv.KeyRange {
}
// EncodeIndexRanges encodes index ranges into kv.KeyRanges.
func EncodeIndexRanges(tid int64, rans []*tipb.KeyRange) []kv.KeyRange {
func EncodeIndexRanges(tid, idxID int64, rans []*tipb.KeyRange) []kv.KeyRange {
keyRanges := make([]kv.KeyRange, 0, len(rans))
for _, r := range rans {
// Convert range to kv.KeyRange
start := EncodeIndexSeekKey(tid, r.Low)
end := EncodeIndexSeekKey(tid, r.High)
start := EncodeIndexSeekKey(tid, idxID, r.Low)
end := EncodeIndexSeekKey(tid, idxID, r.High)
nr := kv.KeyRange{
StartKey: start,
EndKey: end,

View File

@ -35,7 +35,7 @@ func (s *tableCodecSuite) TestTableCodec(c *C) {
c.Assert(err, IsNil)
c.Assert(h, Equals, int64(2))
key = EncodeColumnKey(1, codec.EncodeInt(nil, 2), 3)
key = EncodeColumnKey(1, 2, 3)
h, err = DecodeRowKey(key)
c.Assert(err, IsNil)
c.Assert(h, Equals, int64(2))

View File

@ -28,6 +28,7 @@ import (
// SelectResult is used to get response rows from SelectRequest.
type SelectResult struct {
index bool
fields []*types.FieldType
resp kv.Response
}
@ -43,6 +44,7 @@ func (r *SelectResult) Next() (subResult *SubResult, err error) {
return nil, nil
}
subResult = &SubResult{
index: r.index,
fields: r.fields,
reader: reader,
}
@ -56,6 +58,7 @@ func (r *SelectResult) Close() error {
// SubResult represents a subset of select result.
type SubResult struct {
index bool
fields []*types.FieldType
reader io.ReadCloser
resp *tipb.SelectResponse
@ -77,20 +80,24 @@ func (r *SubResult) Next() (handle int64, data []types.Datum, err error) {
if err != nil {
return 0, nil, errors.Trace(err)
}
if r.resp.Error != nil {
return 0, nil, errors.Errorf("[%d %s]", r.resp.Error.GetCode(), r.resp.Error.GetMsg())
}
}
if r.cursor >= len(r.resp.Rows) {
return 0, nil, nil
}
row := r.resp.Rows[r.cursor]
data, err = tablecodec.DecodeValues(row.Data, r.fields)
data, err = tablecodec.DecodeValues(row.Data, r.fields, r.index)
if err != nil {
return 0, nil, errors.Trace(err)
}
handleBytes := row.GetHandle()
_, handle, err = codec.DecodeInt(handleBytes)
datums, err := codec.Decode(handleBytes)
if err != nil {
return 0, nil, errors.Trace(err)
}
handle = datums[0].GetInt64()
r.cursor++
return
}
@ -111,14 +118,14 @@ func Select(client kv.Client, req *tipb.SelectRequest, concurrency int) (*Select
if resp == nil {
return nil, errors.New("client returns nil response")
}
var columns []*tipb.ColumnInfo
result := &SelectResult{resp: resp}
if req.TableInfo != nil {
columns = req.TableInfo.Columns
result.fields = tablecodec.ProtoColumnsToFieldTypes(req.TableInfo.Columns)
} else {
columns = req.IndexInfo.Columns
result.fields = tablecodec.ProtoColumnsToFieldTypes(req.IndexInfo.Columns)
result.index = true
}
fields := tablecodec.ProtoColumnsToFieldTypes(columns)
return &SelectResult{fields: fields, resp: resp}, nil
return result, nil
}
// Convert tipb.Request to kv.Request.
@ -129,7 +136,8 @@ func composeRequest(req *tipb.SelectRequest, concurrency int) (*kv.Request, erro
if req.IndexInfo != nil {
kvReq.Tp = kv.ReqTypeIndex
tid := req.IndexInfo.GetTableId()
kvReq.KeyRanges = tablecodec.EncodeIndexRanges(tid, req.Ranges)
idxID := req.IndexInfo.GetIndexId()
kvReq.KeyRanges = tablecodec.EncodeIndexRanges(tid, idxID, req.Ranges)
} else {
kvReq.Tp = kv.ReqTypeSelect
tid := req.GetTableInfo().GetTableId()