*: Update the return value type of CutRow and CutIndexKey to [][]byte (#2978)

This commit is contained in:
Lynn
2017-04-01 18:53:52 +08:00
committed by GitHub
parent aaa6184670
commit 61be27d4ac
6 changed files with 293 additions and 73 deletions

View File

@ -15,7 +15,10 @@ package xeval
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
@ -48,7 +51,10 @@ const (
// Evaluator evaluates tipb.Expr.
type Evaluator struct {
Row map[int64]types.Datum // column values.
Row map[int64]types.Datum // column values.
ColumnInfos []*tipb.ColumnInfo
fieldTps []*types.FieldType
valueLists map[*tipb.Expr]*decodedValueList
sc *variable.StatementContext
}
@ -58,6 +64,41 @@ func NewEvaluator(sc *variable.StatementContext) *Evaluator {
return &Evaluator{Row: make(map[int64]types.Datum), sc: sc}
}
// SetColumnInfos sets ColumnInfos.
func (e *Evaluator) SetColumnInfos(cols []*tipb.ColumnInfo) {
e.ColumnInfos = make([]*tipb.ColumnInfo, len(cols))
copy(e.ColumnInfos, cols)
e.fieldTps = make([]*types.FieldType, 0, len(e.ColumnInfos))
for _, col := range e.ColumnInfos {
ft := distsql.FieldTypeFromPBColumn(col)
e.fieldTps = append(e.fieldTps, ft)
}
}
// SetRowValue puts row value into evaluator, the values will be used for expr evaluation.
func (e *Evaluator) SetRowValue(handle int64, row [][]byte, colIDs map[int64]int) error {
for colID, offset := range colIDs {
col := e.ColumnInfos[offset]
if col.GetPkHandle() {
if mysql.HasUnsignedFlag(uint(col.GetFlag())) {
e.Row[colID] = types.NewUintDatum(uint64(handle))
} else {
e.Row[colID] = types.NewIntDatum(handle)
}
} else {
data := row[offset]
ft := e.fieldTps[offset]
datum, err := tablecodec.DecodeColumnValue(data, ft)
if err != nil {
return errors.Trace(err)
}
e.Row[colID] = datum
}
}
return nil
}
type decodedValueList struct {
values []types.Datum
hasNull bool

View File

@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/distsql/xeval"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
@ -403,10 +404,42 @@ func (h *rpcHandler) getRowsFromRange(ctx *selectContext, ran kv.KeyRange, limit
// 2. Checks if it fit where condition.
// 3. Update aggregate functions.
func (h *rpcHandler) handleRowData(ctx *selectContext, handle int64, value []byte) ([]byte, error) {
values, err := handleRowData(ctx.sel.TableInfo.Columns, ctx.colTps, handle, value)
columns := ctx.sel.TableInfo.Columns
values, err := getRowVals(value, ctx.colTps)
if err != nil {
return nil, errors.Trace(err)
}
// Fill the handle and null columns.
for _, col := range columns {
if col.GetPkHandle() {
var handleDatum types.Datum
if mysql.HasUnsignedFlag(uint(col.GetFlag())) {
// PK column is Unsigned.
handleDatum = types.NewUintDatum(uint64(handle))
} else {
handleDatum = types.NewIntDatum(handle)
}
handleData, err1 := codec.EncodeValue(nil, handleDatum)
if err1 != nil {
return nil, errors.Trace(err1)
}
values[col.GetColumnId()] = handleData
continue
}
_, ok := values[col.GetColumnId()]
if ok {
continue
}
if len(col.DefaultVal) > 0 {
values[col.GetColumnId()] = col.DefaultVal
continue
}
if mysql.HasNotNullFlag(uint(col.GetFlag())) {
return nil, errors.Errorf("Miss column %d", col.GetColumnId())
}
values[col.GetColumnId()] = []byte{codec.NilFlag}
}
return h.valuesToRow(ctx, handle, values)
}
@ -446,7 +479,7 @@ func (h *rpcHandler) valuesToRow(ctx *selectContext, handle int64, values map[in
return data, nil
}
func getRowData(value []byte, colTps map[int64]*types.FieldType) (map[int64][]byte, error) {
func getRowVals(value []byte, colTps map[int64]*types.FieldType) (map[int64][]byte, error) {
res, err := tablecodec.CutRow(value, colTps)
if err != nil {
return nil, errors.Trace(err)
@ -612,3 +645,25 @@ func decodeHandle(data []byte) (int64, error) {
err := binary.Read(buf, binary.BigEndian, &h)
return h, errors.Trace(err)
}
// setColumnValueToEval puts column values into evaluator, the values will be used for expr evaluation.
func setColumnValueToEval(eval *xeval.Evaluator, handle int64, row map[int64][]byte, cols map[int64]*tipb.ColumnInfo) error {
for colID, col := range cols {
if col.GetPkHandle() {
if mysql.HasUnsignedFlag(uint(col.GetFlag())) {
eval.Row[colID] = types.NewUintDatum(uint64(handle))
} else {
eval.Row[colID] = types.NewIntDatum(handle)
}
} else {
data := row[colID]
ft := distsql.FieldTypeFromPBColumn(col)
datum, err := tablecodec.DecodeColumnValue(data, ft)
if err != nil {
return errors.Trace(err)
}
eval.Row[colID] = datum
}
}
return nil
}

View File

@ -17,11 +17,9 @@ import (
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/distsql/xeval"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
)
@ -36,11 +34,6 @@ type dagContext struct {
columns []*tipb.ColumnInfo
}
func (ctx *dagContext) setColumns(columns []*tipb.ColumnInfo) {
ctx.columns = make([]*tipb.ColumnInfo, len(columns))
copy(ctx.columns, columns)
}
func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) (*coprocessor.Response, error) {
resp := &coprocessor.Response{}
if len(req.Ranges) == 0 {
@ -80,8 +73,8 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) (*coprocessor
break
}
data := dummySlice
for _, col := range ctx.columns {
data = append(data, row[col.GetColumnId()]...)
for _, val := range row {
data = append(data, val...)
}
chunks = appendRow(chunks, handle, data)
}
@ -93,19 +86,16 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor,
switch curr.GetTp() {
case tipb.ExecType_TypeTableScan:
columns := curr.TblScan.Columns
ctx.setColumns(columns)
colTps := make(map[int64]*types.FieldType, len(columns))
for _, col := range columns {
if col.GetPkHandle() {
continue
}
colTps[col.GetColumnId()] = distsql.FieldTypeFromPBColumn(col)
ctx.eval.SetColumnInfos(columns)
colIDs := make(map[int64]int)
for i, col := range columns {
colIDs[col.GetColumnId()] = i
}
ranges := h.extractKVRanges(ctx.keyRanges, *curr.TblScan.Desc)
currExec = &tableScanExec{
TableScan: curr.TblScan,
kvRanges: ranges,
colTps: colTps,
colsID: colIDs,
startTS: ctx.dagReq.GetStartTs(),
mvccStore: h.mvccStore,
rawStartKey: h.rawStartKey,
@ -113,21 +103,17 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor,
}
case tipb.ExecType_TypeIndexScan:
columns := curr.IdxScan.Columns
ctx.setColumns(columns)
ctx.eval.SetColumnInfos(columns)
length := len(columns)
// The PKHandle column info has been collected in ctx.
if columns[length-1].GetPkHandle() {
columns = columns[:length-1]
}
ids := make([]int64, 0, len(columns))
for _, col := range columns {
ids = append(ids, col.GetColumnId())
}
ranges := h.extractKVRanges(ctx.keyRanges, *curr.IdxScan.Desc)
currExec = &indexScanExec{
IndexScan: curr.IdxScan,
kvRanges: ranges,
ids: ids,
colsLen: len(columns),
startTS: ctx.dagReq.GetStartTs(),
mvccStore: h.mvccStore,
rawStartKey: h.rawStartKey,
@ -139,15 +125,15 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor,
if len(curr.Selection.Conditions) > 0 {
cond = curr.Selection.Conditions[0]
}
cols := make(map[int64]*tipb.ColumnInfo)
err := extractColumnsInExpr(cond, ctx.columns, cols)
colIDs := make(map[int64]int)
err := extractColIDsInExpr(cond, ctx.eval.ColumnInfos, colIDs)
if err != nil {
return nil, errors.Trace(err)
}
currExec = &selectionExec{
Selection: curr.Selection,
eval: ctx.eval,
columns: cols,
colsID: colIDs,
sc: ctx.sc,
}
default:

View File

@ -17,7 +17,6 @@ import (
"bytes"
"github.com/juju/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/distsql/xeval"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
@ -30,12 +29,12 @@ import (
type executor interface {
SetSrcExec(executor)
Next() (int64, map[int64][]byte, error)
Next() (int64, [][]byte, error)
}
type tableScanExec struct {
*tipb.TableScan
colTps map[int64]*types.FieldType
colsID map[int64]int
kvRanges []kv.KeyRange
startTS uint64
mvccStore *MvccStore
@ -51,7 +50,7 @@ func (e *tableScanExec) SetSrcExec(exec executor) {
e.src = exec
}
func (e *tableScanExec) Next() (int64, map[int64][]byte, error) {
func (e *tableScanExec) Next() (int64, [][]byte, error) {
for e.cursor < len(e.kvRanges) {
ran := e.kvRanges[e.cursor]
if ran.IsPoint() {
@ -79,7 +78,7 @@ func (e *tableScanExec) Next() (int64, map[int64][]byte, error) {
return 0, nil, nil
}
func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) (int64, map[int64][]byte, error) {
func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) (int64, [][]byte, error) {
val, err := e.mvccStore.Get(ran.StartKey, e.startTS)
if len(val) == 0 {
return 0, nil, nil
@ -90,14 +89,14 @@ func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) (int64, map[int64][]byt
if err != nil {
return 0, nil, errors.Trace(err)
}
row, err := handleRowData(e.Columns, e.colTps, handle, val)
row, err := getRowData(e.Columns, e.colsID, handle, val)
if err != nil {
return 0, nil, errors.Trace(err)
}
return handle, row, nil
}
func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byte, error) {
func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, [][]byte, error) {
if e.seekKey == nil {
startKey := maxStartKey(ran.StartKey, e.rawStartKey)
endKey := minEndKey(ran.EndKey, e.rawEndKey)
@ -143,7 +142,7 @@ func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt
if err != nil {
return 0, nil, errors.Trace(err)
}
row, err := handleRowData(e.Columns, e.colTps, handle, pair.Value)
row, err := getRowData(e.Columns, e.colsID, handle, pair.Value)
if err != nil {
return 0, nil, errors.Trace(err)
}
@ -152,7 +151,7 @@ func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt
type indexScanExec struct {
*tipb.IndexScan
ids []int64
colsLen int
kvRanges []kv.KeyRange
startTS uint64
mvccStore *MvccStore
@ -168,7 +167,7 @@ func (e *indexScanExec) SetSrcExec(exec executor) {
e.src = exec
}
func (e *indexScanExec) Next() (int64, map[int64][]byte, error) {
func (e *indexScanExec) Next() (int64, [][]byte, error) {
for e.cursor < len(e.kvRanges) {
ran := e.kvRanges[e.cursor]
handle, value, err := e.getRowFromRange(ran)
@ -186,7 +185,7 @@ func (e *indexScanExec) Next() (int64, map[int64][]byte, error) {
return 0, nil, nil
}
func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byte, error) {
func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, [][]byte, error) {
if e.seekKey == nil {
startKey := maxStartKey(ran.StartKey, e.rawStartKey)
endKey := minEndKey(ran.EndKey, e.rawEndKey)
@ -228,7 +227,7 @@ func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt
e.seekKey = []byte(kv.Key(pair.Key).PrefixNext())
}
values, b, err := tablecodec.CutIndexKey(pair.Key, e.ids)
values, b, err := tablecodec.CutIndexKeyNew(pair.Key, e.colsLen)
var handle int64
if len(b) > 0 {
var handleDatum types.Datum
@ -249,9 +248,9 @@ func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt
type selectionExec struct {
*tipb.Selection
sc *variable.StatementContext
eval *xeval.Evaluator
columns map[int64]*tipb.ColumnInfo
sc *variable.StatementContext
eval *xeval.Evaluator
colsID map[int64]int
src executor
}
@ -260,7 +259,7 @@ func (e *selectionExec) SetSrcExec(exec executor) {
e.src = exec
}
func (e *selectionExec) Next() (int64, map[int64][]byte, error) {
func (e *selectionExec) Next() (int64, [][]byte, error) {
for {
handle, row, err := e.src.Next()
if err != nil {
@ -270,7 +269,7 @@ func (e *selectionExec) Next() (int64, map[int64][]byte, error) {
return 0, nil, nil
}
err = setColumnValueToEval(e.eval, handle, row, e.columns)
err = e.eval.SetRowValue(handle, row, e.colsID)
if err != nil {
return 0, nil, errors.Trace(err)
}
@ -292,15 +291,27 @@ func (e *selectionExec) Next() (int64, map[int64][]byte, error) {
}
}
// handleRowData deals with raw row data:
// 1. Decodes row from raw byte slice.
func handleRowData(columns []*tipb.ColumnInfo, colTps map[int64]*types.FieldType, handle int64, value []byte) (map[int64][]byte, error) {
values, err := getRowData(value, colTps)
func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool {
offset, ok := colIDs[id]
if ok && data[offset] != nil {
return true
}
return false
}
// getRowData decodes raw byte slice to row data.
func getRowData(columns []*tipb.ColumnInfo, colIDs map[int64]int, handle int64, value []byte) ([][]byte, error) {
values, err := tablecodec.CutRowNew(value, colIDs)
if err != nil {
return nil, errors.Trace(err)
}
if values == nil {
values = make([][]byte, len(colIDs))
}
// Fill the handle and null columns.
for _, col := range columns {
id := col.GetColumnId()
offset := colIDs[id]
if col.GetPkHandle() {
var handleDatum types.Datum
if mysql.HasUnsignedFlag(uint(col.GetFlag())) {
@ -313,43 +324,46 @@ func handleRowData(columns []*tipb.ColumnInfo, colTps map[int64]*types.FieldType
if err1 != nil {
return nil, errors.Trace(err1)
}
values[col.GetColumnId()] = handleData
values[offset] = handleData
continue
}
_, ok := values[col.GetColumnId()]
if ok {
if hasColVal(values, colIDs, id) {
continue
}
if len(col.DefaultVal) > 0 {
values[col.GetColumnId()] = col.DefaultVal
values[offset] = col.DefaultVal
continue
}
if mysql.HasNotNullFlag(uint(col.GetFlag())) {
return nil, errors.New("Miss column")
return nil, errors.Errorf("Miss column %d", id)
}
values[col.GetColumnId()] = []byte{codec.NilFlag}
values[offset] = []byte{codec.NilFlag}
}
return values, nil
}
// setColumnValueToEval puts column values into evaluator, the values will be used for expr evaluation.
func setColumnValueToEval(eval *xeval.Evaluator, handle int64, row map[int64][]byte, cols map[int64]*tipb.ColumnInfo) error {
for colID, col := range cols {
if col.GetPkHandle() {
if mysql.HasUnsignedFlag(uint(col.GetFlag())) {
eval.Row[colID] = types.NewUintDatum(uint64(handle))
} else {
eval.Row[colID] = types.NewIntDatum(handle)
func extractColIDsInExpr(expr *tipb.Expr, columns []*tipb.ColumnInfo, collector map[int64]int) error {
if expr == nil {
return nil
}
if expr.GetTp() == tipb.ExprType_ColumnRef {
_, i, err := codec.DecodeInt(expr.Val)
if err != nil {
return errors.Trace(err)
}
for idx, c := range columns {
if c.GetColumnId() == i {
collector[i] = idx
return nil
}
} else {
data := row[colID]
ft := distsql.FieldTypeFromPBColumn(col)
datum, err := tablecodec.DecodeColumnValue(data, ft)
if err != nil {
return errors.Trace(err)
}
eval.Row[colID] = datum
}
return xeval.ErrInvalid.Gen("column %d not found", i)
}
for _, child := range expr.Children {
err := extractColIDsInExpr(child, columns, collector)
if err != nil {
return errors.Trace(err)
}
}
return nil

View File

@ -298,7 +298,48 @@ func DecodeRow(b []byte, cols map[int64]*types.FieldType) (map[int64]types.Datum
return row, nil
}
// CutRow cut encoded row into byte slices and return interested columns' byte slice.
// CutRowNew cuts encoded row into byte slices and return columns' byte slice.
// Row layout: colID1, value1, colID2, value2, .....
func CutRowNew(data []byte, colIDs map[int64]int) ([][]byte, error) {
if data == nil {
return nil, nil
}
if len(data) == 1 && data[0] == codec.NilFlag {
return nil, nil
}
var (
cnt int
b []byte
err error
)
row := make([][]byte, len(colIDs))
for len(data) > 0 && cnt < len(colIDs) {
// Get col id.
b, data, err = codec.CutOne(data)
if err != nil {
return nil, errors.Trace(err)
}
_, cid, err := codec.DecodeOne(b)
if err != nil {
return nil, errors.Trace(err)
}
// Get col value.
b, data, err = codec.CutOne(data)
if err != nil {
return nil, errors.Trace(err)
}
id := cid.GetInt64()
offset, ok := colIDs[id]
if ok {
row[offset] = b
cnt++
}
}
return row, nil
}
// CutRow cuts encoded row into byte slices and return interested columns' byte slice.
// Row layout: colID1, value1, colID2, value2, .....
func CutRow(data []byte, cols map[int64]*types.FieldType) (map[int64][]byte, error) {
if data == nil {
@ -421,6 +462,23 @@ func CutIndexKey(key kv.Key, colIDs []int64) (values map[int64][]byte, b []byte,
return
}
// CutIndexKeyNew cuts encoded index key into colIDs to bytes slices.
// The returned value b is the remaining bytes of the key which would be empty if it is unique index or handle data
// if it is non-unique index.
func CutIndexKeyNew(key kv.Key, length int) (values [][]byte, b []byte, err error) {
b = key[prefixLen+idLen:]
values = make([][]byte, 0, length)
for i := 0; i < length; i++ {
var val []byte
val, b, err = codec.CutOne(b)
if err != nil {
return nil, nil, errors.Trace(err)
}
values = append(values, val)
}
return
}
// EncodeTableIndexPrefix encodes index prefix with tableID and idxID.
func EncodeTableIndexPrefix(tableID, idxID int64) kv.Key {
key := make([]byte, 0, prefixLen)

View File

@ -178,6 +178,72 @@ func (s *testTableCodecSuite) TestTimeCodec(c *C) {
}
}
func (s *testTableCodecSuite) TestCutRow(c *C) {
defer testleak.AfterTest(c)()
var err error
c1 := &column{id: 1, tp: types.NewFieldType(mysql.TypeLonglong)}
c2 := &column{id: 2, tp: types.NewFieldType(mysql.TypeVarchar)}
c3 := &column{id: 3, tp: types.NewFieldType(mysql.TypeNewDecimal)}
cols := []*column{c1, c2, c3}
row := make([]types.Datum, 3)
row[0] = types.NewIntDatum(100)
row[1] = types.NewBytesDatum([]byte("abc"))
row[2] = types.NewDecimalDatum(types.NewDecFromInt(1))
data := make([][]byte, 3)
data[0], err = EncodeValue(row[0])
c.Assert(err, IsNil)
data[1], err = EncodeValue(row[1])
c.Assert(err, IsNil)
data[2], err = EncodeValue(row[2])
c.Assert(err, IsNil)
// Encode
colIDs := make([]int64, 0, 3)
for _, col := range cols {
colIDs = append(colIDs, col.id)
}
bs, err := EncodeRow(row, colIDs)
c.Assert(err, IsNil)
c.Assert(bs, NotNil)
// Decode
colMap := make(map[int64]int, 3)
for i, col := range cols {
colMap[col.id] = i
}
r, err := CutRowNew(bs, colMap)
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r, HasLen, 3)
// Compare cut row and original row
for i := range colIDs {
c.Assert(r[i], DeepEquals, data[i])
}
}
func (s *testTableCodecSuite) TestCutKeyNew(c *C) {
values := []types.Datum{types.NewIntDatum(1), types.NewBytesDatum([]byte("abc")), types.NewFloat64Datum(5.5)}
handle := types.NewIntDatum(100)
values = append(values, handle)
encodedValue, err := codec.EncodeKey(nil, values...)
c.Assert(err, IsNil)
tableID := int64(4)
indexID := int64(5)
indexKey := EncodeIndexSeekKey(tableID, indexID, encodedValue)
valuesBytes, handleBytes, err := CutIndexKeyNew(indexKey, 3)
c.Assert(err, IsNil)
for i := 0; i < 3; i++ {
valueBytes := valuesBytes[i]
var val types.Datum
_, val, _ = codec.DecodeOne(valueBytes)
c.Assert(val, DeepEquals, values[i])
}
_, handleVal, _ := codec.DecodeOne(handleBytes)
c.Assert(handleVal, DeepEquals, types.NewIntDatum(100))
}
func (s *testTableCodecSuite) TestCutKey(c *C) {
colIDs := []int64{1, 2, 3}
values := []types.Datum{types.NewIntDatum(1), types.NewBytesDatum([]byte("abc")), types.NewFloat64Datum(5.5)}