diff --git a/distsql/xeval/eval.go b/distsql/xeval/eval.go index 18ae9fd22f..b38bde24a7 100644 --- a/distsql/xeval/eval.go +++ b/distsql/xeval/eval.go @@ -15,7 +15,10 @@ package xeval import ( "github.com/juju/errors" + "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/types" "github.com/pingcap/tipb/go-tipb" @@ -48,7 +51,10 @@ const ( // Evaluator evaluates tipb.Expr. type Evaluator struct { - Row map[int64]types.Datum // column values. + Row map[int64]types.Datum // column values. + ColumnInfos []*tipb.ColumnInfo + + fieldTps []*types.FieldType valueLists map[*tipb.Expr]*decodedValueList sc *variable.StatementContext } @@ -58,6 +64,41 @@ func NewEvaluator(sc *variable.StatementContext) *Evaluator { return &Evaluator{Row: make(map[int64]types.Datum), sc: sc} } +// SetColumnInfos sets ColumnInfos. +func (e *Evaluator) SetColumnInfos(cols []*tipb.ColumnInfo) { + e.ColumnInfos = make([]*tipb.ColumnInfo, len(cols)) + copy(e.ColumnInfos, cols) + + e.fieldTps = make([]*types.FieldType, 0, len(e.ColumnInfos)) + for _, col := range e.ColumnInfos { + ft := distsql.FieldTypeFromPBColumn(col) + e.fieldTps = append(e.fieldTps, ft) + } +} + +// SetRowValue puts row value into evaluator, the values will be used for expr evaluation. +func (e *Evaluator) SetRowValue(handle int64, row [][]byte, colIDs map[int64]int) error { + for colID, offset := range colIDs { + col := e.ColumnInfos[offset] + if col.GetPkHandle() { + if mysql.HasUnsignedFlag(uint(col.GetFlag())) { + e.Row[colID] = types.NewUintDatum(uint64(handle)) + } else { + e.Row[colID] = types.NewIntDatum(handle) + } + } else { + data := row[offset] + ft := e.fieldTps[offset] + datum, err := tablecodec.DecodeColumnValue(data, ft) + if err != nil { + return errors.Trace(err) + } + e.Row[colID] = datum + } + } + return nil +} + type decodedValueList struct { values []types.Datum hasNull bool diff --git a/store/tikv/mock-tikv/cop_handler.go b/store/tikv/mock-tikv/cop_handler.go index e2cee837ec..43f53bf1f6 100644 --- a/store/tikv/mock-tikv/cop_handler.go +++ b/store/tikv/mock-tikv/cop_handler.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/distsql/xeval" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" @@ -403,10 +404,42 @@ func (h *rpcHandler) getRowsFromRange(ctx *selectContext, ran kv.KeyRange, limit // 2. Checks if it fit where condition. // 3. Update aggregate functions. func (h *rpcHandler) handleRowData(ctx *selectContext, handle int64, value []byte) ([]byte, error) { - values, err := handleRowData(ctx.sel.TableInfo.Columns, ctx.colTps, handle, value) + columns := ctx.sel.TableInfo.Columns + values, err := getRowVals(value, ctx.colTps) if err != nil { return nil, errors.Trace(err) } + // Fill the handle and null columns. + for _, col := range columns { + if col.GetPkHandle() { + var handleDatum types.Datum + if mysql.HasUnsignedFlag(uint(col.GetFlag())) { + // PK column is Unsigned. + handleDatum = types.NewUintDatum(uint64(handle)) + } else { + handleDatum = types.NewIntDatum(handle) + } + handleData, err1 := codec.EncodeValue(nil, handleDatum) + if err1 != nil { + return nil, errors.Trace(err1) + } + values[col.GetColumnId()] = handleData + continue + } + _, ok := values[col.GetColumnId()] + if ok { + continue + } + if len(col.DefaultVal) > 0 { + values[col.GetColumnId()] = col.DefaultVal + continue + } + if mysql.HasNotNullFlag(uint(col.GetFlag())) { + return nil, errors.Errorf("Miss column %d", col.GetColumnId()) + } + values[col.GetColumnId()] = []byte{codec.NilFlag} + } + return h.valuesToRow(ctx, handle, values) } @@ -446,7 +479,7 @@ func (h *rpcHandler) valuesToRow(ctx *selectContext, handle int64, values map[in return data, nil } -func getRowData(value []byte, colTps map[int64]*types.FieldType) (map[int64][]byte, error) { +func getRowVals(value []byte, colTps map[int64]*types.FieldType) (map[int64][]byte, error) { res, err := tablecodec.CutRow(value, colTps) if err != nil { return nil, errors.Trace(err) @@ -612,3 +645,25 @@ func decodeHandle(data []byte) (int64, error) { err := binary.Read(buf, binary.BigEndian, &h) return h, errors.Trace(err) } + +// setColumnValueToEval puts column values into evaluator, the values will be used for expr evaluation. +func setColumnValueToEval(eval *xeval.Evaluator, handle int64, row map[int64][]byte, cols map[int64]*tipb.ColumnInfo) error { + for colID, col := range cols { + if col.GetPkHandle() { + if mysql.HasUnsignedFlag(uint(col.GetFlag())) { + eval.Row[colID] = types.NewUintDatum(uint64(handle)) + } else { + eval.Row[colID] = types.NewIntDatum(handle) + } + } else { + data := row[colID] + ft := distsql.FieldTypeFromPBColumn(col) + datum, err := tablecodec.DecodeColumnValue(data, ft) + if err != nil { + return errors.Trace(err) + } + eval.Row[colID] = datum + } + } + return nil +} diff --git a/store/tikv/mock-tikv/cop_handler_dag.go b/store/tikv/mock-tikv/cop_handler_dag.go index 6af2e4c1a6..92ea0ccada 100644 --- a/store/tikv/mock-tikv/cop_handler_dag.go +++ b/store/tikv/mock-tikv/cop_handler_dag.go @@ -17,11 +17,9 @@ import ( "github.com/golang/protobuf/proto" "github.com/juju/errors" "github.com/pingcap/kvproto/pkg/coprocessor" - "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/distsql/xeval" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/types" "github.com/pingcap/tipb/go-tipb" ) @@ -36,11 +34,6 @@ type dagContext struct { columns []*tipb.ColumnInfo } -func (ctx *dagContext) setColumns(columns []*tipb.ColumnInfo) { - ctx.columns = make([]*tipb.ColumnInfo, len(columns)) - copy(ctx.columns, columns) -} - func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) (*coprocessor.Response, error) { resp := &coprocessor.Response{} if len(req.Ranges) == 0 { @@ -80,8 +73,8 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) (*coprocessor break } data := dummySlice - for _, col := range ctx.columns { - data = append(data, row[col.GetColumnId()]...) + for _, val := range row { + data = append(data, val...) } chunks = appendRow(chunks, handle, data) } @@ -93,19 +86,16 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, switch curr.GetTp() { case tipb.ExecType_TypeTableScan: columns := curr.TblScan.Columns - ctx.setColumns(columns) - colTps := make(map[int64]*types.FieldType, len(columns)) - for _, col := range columns { - if col.GetPkHandle() { - continue - } - colTps[col.GetColumnId()] = distsql.FieldTypeFromPBColumn(col) + ctx.eval.SetColumnInfos(columns) + colIDs := make(map[int64]int) + for i, col := range columns { + colIDs[col.GetColumnId()] = i } ranges := h.extractKVRanges(ctx.keyRanges, *curr.TblScan.Desc) currExec = &tableScanExec{ TableScan: curr.TblScan, kvRanges: ranges, - colTps: colTps, + colsID: colIDs, startTS: ctx.dagReq.GetStartTs(), mvccStore: h.mvccStore, rawStartKey: h.rawStartKey, @@ -113,21 +103,17 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, } case tipb.ExecType_TypeIndexScan: columns := curr.IdxScan.Columns - ctx.setColumns(columns) + ctx.eval.SetColumnInfos(columns) length := len(columns) // The PKHandle column info has been collected in ctx. if columns[length-1].GetPkHandle() { columns = columns[:length-1] } - ids := make([]int64, 0, len(columns)) - for _, col := range columns { - ids = append(ids, col.GetColumnId()) - } ranges := h.extractKVRanges(ctx.keyRanges, *curr.IdxScan.Desc) currExec = &indexScanExec{ IndexScan: curr.IdxScan, kvRanges: ranges, - ids: ids, + colsLen: len(columns), startTS: ctx.dagReq.GetStartTs(), mvccStore: h.mvccStore, rawStartKey: h.rawStartKey, @@ -139,15 +125,15 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, if len(curr.Selection.Conditions) > 0 { cond = curr.Selection.Conditions[0] } - cols := make(map[int64]*tipb.ColumnInfo) - err := extractColumnsInExpr(cond, ctx.columns, cols) + colIDs := make(map[int64]int) + err := extractColIDsInExpr(cond, ctx.eval.ColumnInfos, colIDs) if err != nil { return nil, errors.Trace(err) } currExec = &selectionExec{ Selection: curr.Selection, eval: ctx.eval, - columns: cols, + colsID: colIDs, sc: ctx.sc, } default: diff --git a/store/tikv/mock-tikv/executor.go b/store/tikv/mock-tikv/executor.go index b2c4a6ac07..9ad5a2e85c 100644 --- a/store/tikv/mock-tikv/executor.go +++ b/store/tikv/mock-tikv/executor.go @@ -17,7 +17,6 @@ import ( "bytes" "github.com/juju/errors" - "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/distsql/xeval" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/mysql" @@ -30,12 +29,12 @@ import ( type executor interface { SetSrcExec(executor) - Next() (int64, map[int64][]byte, error) + Next() (int64, [][]byte, error) } type tableScanExec struct { *tipb.TableScan - colTps map[int64]*types.FieldType + colsID map[int64]int kvRanges []kv.KeyRange startTS uint64 mvccStore *MvccStore @@ -51,7 +50,7 @@ func (e *tableScanExec) SetSrcExec(exec executor) { e.src = exec } -func (e *tableScanExec) Next() (int64, map[int64][]byte, error) { +func (e *tableScanExec) Next() (int64, [][]byte, error) { for e.cursor < len(e.kvRanges) { ran := e.kvRanges[e.cursor] if ran.IsPoint() { @@ -79,7 +78,7 @@ func (e *tableScanExec) Next() (int64, map[int64][]byte, error) { return 0, nil, nil } -func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) (int64, map[int64][]byte, error) { +func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) (int64, [][]byte, error) { val, err := e.mvccStore.Get(ran.StartKey, e.startTS) if len(val) == 0 { return 0, nil, nil @@ -90,14 +89,14 @@ func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) (int64, map[int64][]byt if err != nil { return 0, nil, errors.Trace(err) } - row, err := handleRowData(e.Columns, e.colTps, handle, val) + row, err := getRowData(e.Columns, e.colsID, handle, val) if err != nil { return 0, nil, errors.Trace(err) } return handle, row, nil } -func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byte, error) { +func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, [][]byte, error) { if e.seekKey == nil { startKey := maxStartKey(ran.StartKey, e.rawStartKey) endKey := minEndKey(ran.EndKey, e.rawEndKey) @@ -143,7 +142,7 @@ func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt if err != nil { return 0, nil, errors.Trace(err) } - row, err := handleRowData(e.Columns, e.colTps, handle, pair.Value) + row, err := getRowData(e.Columns, e.colsID, handle, pair.Value) if err != nil { return 0, nil, errors.Trace(err) } @@ -152,7 +151,7 @@ func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt type indexScanExec struct { *tipb.IndexScan - ids []int64 + colsLen int kvRanges []kv.KeyRange startTS uint64 mvccStore *MvccStore @@ -168,7 +167,7 @@ func (e *indexScanExec) SetSrcExec(exec executor) { e.src = exec } -func (e *indexScanExec) Next() (int64, map[int64][]byte, error) { +func (e *indexScanExec) Next() (int64, [][]byte, error) { for e.cursor < len(e.kvRanges) { ran := e.kvRanges[e.cursor] handle, value, err := e.getRowFromRange(ran) @@ -186,7 +185,7 @@ func (e *indexScanExec) Next() (int64, map[int64][]byte, error) { return 0, nil, nil } -func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byte, error) { +func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, [][]byte, error) { if e.seekKey == nil { startKey := maxStartKey(ran.StartKey, e.rawStartKey) endKey := minEndKey(ran.EndKey, e.rawEndKey) @@ -228,7 +227,7 @@ func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt e.seekKey = []byte(kv.Key(pair.Key).PrefixNext()) } - values, b, err := tablecodec.CutIndexKey(pair.Key, e.ids) + values, b, err := tablecodec.CutIndexKeyNew(pair.Key, e.colsLen) var handle int64 if len(b) > 0 { var handleDatum types.Datum @@ -249,9 +248,9 @@ func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) (int64, map[int64][]byt type selectionExec struct { *tipb.Selection - sc *variable.StatementContext - eval *xeval.Evaluator - columns map[int64]*tipb.ColumnInfo + sc *variable.StatementContext + eval *xeval.Evaluator + colsID map[int64]int src executor } @@ -260,7 +259,7 @@ func (e *selectionExec) SetSrcExec(exec executor) { e.src = exec } -func (e *selectionExec) Next() (int64, map[int64][]byte, error) { +func (e *selectionExec) Next() (int64, [][]byte, error) { for { handle, row, err := e.src.Next() if err != nil { @@ -270,7 +269,7 @@ func (e *selectionExec) Next() (int64, map[int64][]byte, error) { return 0, nil, nil } - err = setColumnValueToEval(e.eval, handle, row, e.columns) + err = e.eval.SetRowValue(handle, row, e.colsID) if err != nil { return 0, nil, errors.Trace(err) } @@ -292,15 +291,27 @@ func (e *selectionExec) Next() (int64, map[int64][]byte, error) { } } -// handleRowData deals with raw row data: -// 1. Decodes row from raw byte slice. -func handleRowData(columns []*tipb.ColumnInfo, colTps map[int64]*types.FieldType, handle int64, value []byte) (map[int64][]byte, error) { - values, err := getRowData(value, colTps) +func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool { + offset, ok := colIDs[id] + if ok && data[offset] != nil { + return true + } + return false +} + +// getRowData decodes raw byte slice to row data. +func getRowData(columns []*tipb.ColumnInfo, colIDs map[int64]int, handle int64, value []byte) ([][]byte, error) { + values, err := tablecodec.CutRowNew(value, colIDs) if err != nil { return nil, errors.Trace(err) } + if values == nil { + values = make([][]byte, len(colIDs)) + } // Fill the handle and null columns. for _, col := range columns { + id := col.GetColumnId() + offset := colIDs[id] if col.GetPkHandle() { var handleDatum types.Datum if mysql.HasUnsignedFlag(uint(col.GetFlag())) { @@ -313,43 +324,46 @@ func handleRowData(columns []*tipb.ColumnInfo, colTps map[int64]*types.FieldType if err1 != nil { return nil, errors.Trace(err1) } - values[col.GetColumnId()] = handleData + values[offset] = handleData continue } - _, ok := values[col.GetColumnId()] - if ok { + if hasColVal(values, colIDs, id) { continue } if len(col.DefaultVal) > 0 { - values[col.GetColumnId()] = col.DefaultVal + values[offset] = col.DefaultVal continue } if mysql.HasNotNullFlag(uint(col.GetFlag())) { - return nil, errors.New("Miss column") + return nil, errors.Errorf("Miss column %d", id) } - values[col.GetColumnId()] = []byte{codec.NilFlag} + values[offset] = []byte{codec.NilFlag} } return values, nil } -// setColumnValueToEval puts column values into evaluator, the values will be used for expr evaluation. -func setColumnValueToEval(eval *xeval.Evaluator, handle int64, row map[int64][]byte, cols map[int64]*tipb.ColumnInfo) error { - for colID, col := range cols { - if col.GetPkHandle() { - if mysql.HasUnsignedFlag(uint(col.GetFlag())) { - eval.Row[colID] = types.NewUintDatum(uint64(handle)) - } else { - eval.Row[colID] = types.NewIntDatum(handle) +func extractColIDsInExpr(expr *tipb.Expr, columns []*tipb.ColumnInfo, collector map[int64]int) error { + if expr == nil { + return nil + } + if expr.GetTp() == tipb.ExprType_ColumnRef { + _, i, err := codec.DecodeInt(expr.Val) + if err != nil { + return errors.Trace(err) + } + for idx, c := range columns { + if c.GetColumnId() == i { + collector[i] = idx + return nil } - } else { - data := row[colID] - ft := distsql.FieldTypeFromPBColumn(col) - datum, err := tablecodec.DecodeColumnValue(data, ft) - if err != nil { - return errors.Trace(err) - } - eval.Row[colID] = datum + } + return xeval.ErrInvalid.Gen("column %d not found", i) + } + for _, child := range expr.Children { + err := extractColIDsInExpr(child, columns, collector) + if err != nil { + return errors.Trace(err) } } return nil diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index c69ec484b0..b9fbcd74dd 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -298,7 +298,48 @@ func DecodeRow(b []byte, cols map[int64]*types.FieldType) (map[int64]types.Datum return row, nil } -// CutRow cut encoded row into byte slices and return interested columns' byte slice. +// CutRowNew cuts encoded row into byte slices and return columns' byte slice. +// Row layout: colID1, value1, colID2, value2, ..... +func CutRowNew(data []byte, colIDs map[int64]int) ([][]byte, error) { + if data == nil { + return nil, nil + } + if len(data) == 1 && data[0] == codec.NilFlag { + return nil, nil + } + + var ( + cnt int + b []byte + err error + ) + row := make([][]byte, len(colIDs)) + for len(data) > 0 && cnt < len(colIDs) { + // Get col id. + b, data, err = codec.CutOne(data) + if err != nil { + return nil, errors.Trace(err) + } + _, cid, err := codec.DecodeOne(b) + if err != nil { + return nil, errors.Trace(err) + } + // Get col value. + b, data, err = codec.CutOne(data) + if err != nil { + return nil, errors.Trace(err) + } + id := cid.GetInt64() + offset, ok := colIDs[id] + if ok { + row[offset] = b + cnt++ + } + } + return row, nil +} + +// CutRow cuts encoded row into byte slices and return interested columns' byte slice. // Row layout: colID1, value1, colID2, value2, ..... func CutRow(data []byte, cols map[int64]*types.FieldType) (map[int64][]byte, error) { if data == nil { @@ -421,6 +462,23 @@ func CutIndexKey(key kv.Key, colIDs []int64) (values map[int64][]byte, b []byte, return } +// CutIndexKeyNew cuts encoded index key into colIDs to bytes slices. +// The returned value b is the remaining bytes of the key which would be empty if it is unique index or handle data +// if it is non-unique index. +func CutIndexKeyNew(key kv.Key, length int) (values [][]byte, b []byte, err error) { + b = key[prefixLen+idLen:] + values = make([][]byte, 0, length) + for i := 0; i < length; i++ { + var val []byte + val, b, err = codec.CutOne(b) + if err != nil { + return nil, nil, errors.Trace(err) + } + values = append(values, val) + } + return +} + // EncodeTableIndexPrefix encodes index prefix with tableID and idxID. func EncodeTableIndexPrefix(tableID, idxID int64) kv.Key { key := make([]byte, 0, prefixLen) diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index 597f5134e6..e42c693bbe 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -178,6 +178,72 @@ func (s *testTableCodecSuite) TestTimeCodec(c *C) { } } +func (s *testTableCodecSuite) TestCutRow(c *C) { + defer testleak.AfterTest(c)() + + var err error + c1 := &column{id: 1, tp: types.NewFieldType(mysql.TypeLonglong)} + c2 := &column{id: 2, tp: types.NewFieldType(mysql.TypeVarchar)} + c3 := &column{id: 3, tp: types.NewFieldType(mysql.TypeNewDecimal)} + cols := []*column{c1, c2, c3} + + row := make([]types.Datum, 3) + row[0] = types.NewIntDatum(100) + row[1] = types.NewBytesDatum([]byte("abc")) + row[2] = types.NewDecimalDatum(types.NewDecFromInt(1)) + + data := make([][]byte, 3) + data[0], err = EncodeValue(row[0]) + c.Assert(err, IsNil) + data[1], err = EncodeValue(row[1]) + c.Assert(err, IsNil) + data[2], err = EncodeValue(row[2]) + c.Assert(err, IsNil) + // Encode + colIDs := make([]int64, 0, 3) + for _, col := range cols { + colIDs = append(colIDs, col.id) + } + bs, err := EncodeRow(row, colIDs) + c.Assert(err, IsNil) + c.Assert(bs, NotNil) + + // Decode + colMap := make(map[int64]int, 3) + for i, col := range cols { + colMap[col.id] = i + } + r, err := CutRowNew(bs, colMap) + c.Assert(err, IsNil) + c.Assert(r, NotNil) + c.Assert(r, HasLen, 3) + // Compare cut row and original row + for i := range colIDs { + c.Assert(r[i], DeepEquals, data[i]) + } +} + +func (s *testTableCodecSuite) TestCutKeyNew(c *C) { + values := []types.Datum{types.NewIntDatum(1), types.NewBytesDatum([]byte("abc")), types.NewFloat64Datum(5.5)} + handle := types.NewIntDatum(100) + values = append(values, handle) + encodedValue, err := codec.EncodeKey(nil, values...) + c.Assert(err, IsNil) + tableID := int64(4) + indexID := int64(5) + indexKey := EncodeIndexSeekKey(tableID, indexID, encodedValue) + valuesBytes, handleBytes, err := CutIndexKeyNew(indexKey, 3) + c.Assert(err, IsNil) + for i := 0; i < 3; i++ { + valueBytes := valuesBytes[i] + var val types.Datum + _, val, _ = codec.DecodeOne(valueBytes) + c.Assert(val, DeepEquals, values[i]) + } + _, handleVal, _ := codec.DecodeOne(handleBytes) + c.Assert(handleVal, DeepEquals, types.NewIntDatum(100)) +} + func (s *testTableCodecSuite) TestCutKey(c *C) { colIDs := []int64{1, 2, 3} values := []types.Datum{types.NewIntDatum(1), types.NewBytesDatum([]byte("abc")), types.NewFloat64Datum(5.5)}