308 lines
8.8 KiB
Go
308 lines
8.8 KiB
Go
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/parser/model"
|
|
"github.com/pingcap/parser/mysql"
|
|
"github.com/pingcap/tidb/expression"
|
|
"github.com/pingcap/tidb/kv"
|
|
"github.com/pingcap/tidb/sessionctx"
|
|
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
|
"github.com/pingcap/tidb/tablecodec"
|
|
"github.com/pingcap/tidb/types"
|
|
"github.com/pingcap/tidb/util/chunk"
|
|
"github.com/pingcap/tidb/util/codec"
|
|
"github.com/pingcap/tidb/util/set"
|
|
)
|
|
|
|
type memIndexReader struct {
|
|
ctx sessionctx.Context
|
|
index *model.IndexInfo
|
|
table *model.TableInfo
|
|
kvRanges []kv.KeyRange
|
|
desc bool
|
|
conditions []expression.Expression
|
|
addedRows [][]types.Datum
|
|
retFieldTypes []*types.FieldType
|
|
outputOffset []int
|
|
// cache for decode handle.
|
|
handleBytes []byte
|
|
// memIdxHandles is uses to store the handle ids that has been read by memIndexReader.
|
|
memIdxHandles set.Int64Set
|
|
// belowHandleIndex is the handle's position of the below scan plan.
|
|
belowHandleIndex int
|
|
}
|
|
|
|
func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *memIndexReader {
|
|
kvRanges := idxReader.kvRanges
|
|
outputOffset := make([]int, 0, len(us.columns))
|
|
for _, col := range idxReader.outputColumns {
|
|
outputOffset = append(outputOffset, col.Index)
|
|
}
|
|
return &memIndexReader{
|
|
ctx: us.ctx,
|
|
index: idxReader.index,
|
|
table: idxReader.table.Meta(),
|
|
kvRanges: kvRanges,
|
|
desc: us.desc,
|
|
conditions: us.conditions,
|
|
addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)),
|
|
retFieldTypes: retTypes(us),
|
|
outputOffset: outputOffset,
|
|
handleBytes: make([]byte, 0, 16),
|
|
memIdxHandles: set.NewInt64Set(),
|
|
belowHandleIndex: us.belowHandleIndex,
|
|
}
|
|
}
|
|
|
|
func (m *memIndexReader) getMemRows() ([][]types.Datum, error) {
|
|
tps := make([]*types.FieldType, 0, len(m.index.Columns)+1)
|
|
cols := m.table.Columns
|
|
for _, col := range m.index.Columns {
|
|
tps = append(tps, &cols[col.Offset].FieldType)
|
|
}
|
|
if m.table.PKIsHandle {
|
|
for _, col := range m.table.Columns {
|
|
if mysql.HasPriKeyFlag(col.Flag) {
|
|
tps = append(tps, &col.FieldType)
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
// ExtraHandle Column tp.
|
|
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
|
|
}
|
|
|
|
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
|
|
err := iterTxnMemBuffer(m.ctx, m.kvRanges, func(key, value []byte) error {
|
|
data, err := m.decodeIndexKeyValue(key, value, tps)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
handle := data[m.belowHandleIndex].GetInt64()
|
|
m.memIdxHandles.Insert(handle)
|
|
|
|
mutableRow.SetDatums(data...)
|
|
matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow())
|
|
if err != nil || !matched {
|
|
return err
|
|
}
|
|
m.addedRows = append(m.addedRows, data)
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// TODO: After refine `IterReverse`, remove below logic and use `IterReverse` when do reverse scan.
|
|
if m.desc {
|
|
reverseDatumSlice(m.addedRows)
|
|
}
|
|
return m.addedRows, nil
|
|
}
|
|
|
|
func (m *memIndexReader) decodeIndexKeyValue(key, value []byte, tps []*types.FieldType) ([]types.Datum, error) {
|
|
pkStatus := tablecodec.PrimaryKeyIsSigned
|
|
if mysql.HasUnsignedFlag(tps[len(tps)-1].Flag) {
|
|
pkStatus = tablecodec.PrimaryKeyIsUnsigned
|
|
}
|
|
values, err := tablecodec.DecodeIndexKV(key, value, len(m.index.Columns), pkStatus)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
ds := make([]types.Datum, 0, len(m.outputOffset))
|
|
for _, offset := range m.outputOffset {
|
|
d, err := tablecodec.DecodeColumnValue(values[offset], tps[offset], m.ctx.GetSessionVars().TimeZone)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ds = append(ds, d)
|
|
}
|
|
return ds, nil
|
|
}
|
|
|
|
type memTableReader struct {
|
|
ctx sessionctx.Context
|
|
table *model.TableInfo
|
|
columns []*model.ColumnInfo
|
|
kvRanges []kv.KeyRange
|
|
desc bool
|
|
conditions []expression.Expression
|
|
addedRows [][]types.Datum
|
|
retFieldTypes []*types.FieldType
|
|
colIDs map[int64]int
|
|
// cache for decode handle.
|
|
handleBytes []byte
|
|
}
|
|
|
|
func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader {
|
|
kvRanges := tblReader.kvRanges
|
|
colIDs := make(map[int64]int)
|
|
for i, col := range tblReader.columns {
|
|
colIDs[col.ID] = i
|
|
}
|
|
|
|
return &memTableReader{
|
|
ctx: us.ctx,
|
|
table: tblReader.table.Meta(),
|
|
columns: us.columns,
|
|
kvRanges: kvRanges,
|
|
desc: us.desc,
|
|
conditions: us.conditions,
|
|
addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)),
|
|
retFieldTypes: retTypes(us),
|
|
colIDs: colIDs,
|
|
handleBytes: make([]byte, 0, 16),
|
|
}
|
|
}
|
|
|
|
// TODO: Try to make memXXXReader lazy, There is no need to decode many rows when parent operator only need 1 row.
|
|
func (m *memTableReader) getMemRows() ([][]types.Datum, error) {
|
|
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
|
|
err := iterTxnMemBuffer(m.ctx, m.kvRanges, func(key, value []byte) error {
|
|
row, err := m.decodeRecordKeyValue(key, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mutableRow.SetDatums(row...)
|
|
matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow())
|
|
if err != nil || !matched {
|
|
return err
|
|
}
|
|
m.addedRows = append(m.addedRows, row)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO: After refine `IterReverse`, remove below logic and use `IterReverse` when do reverse scan.
|
|
if m.desc {
|
|
reverseDatumSlice(m.addedRows)
|
|
}
|
|
return m.addedRows, nil
|
|
}
|
|
|
|
func (m *memTableReader) decodeRecordKeyValue(key, value []byte) ([]types.Datum, error) {
|
|
handle, err := tablecodec.DecodeRowKey(key)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return decodeRowData(m.ctx, m.table, m.columns, m.colIDs, handle, m.handleBytes, value)
|
|
}
|
|
|
|
// decodeRowData uses to decode row data value.
|
|
func decodeRowData(ctx sessionctx.Context, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, cacheBytes, value []byte) ([]types.Datum, error) {
|
|
values, err := getRowData(ctx.GetSessionVars().StmtCtx, tb, columns, colIDs, handle, cacheBytes, value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ds := make([]types.Datum, 0, len(columns))
|
|
for _, col := range columns {
|
|
offset := colIDs[col.ID]
|
|
d, err := tablecodec.DecodeColumnValue(values[offset], &col.FieldType, ctx.GetSessionVars().TimeZone)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ds = append(ds, d)
|
|
}
|
|
return ds, nil
|
|
}
|
|
|
|
// getRowData decodes raw byte slice to row data.
|
|
func getRowData(ctx *stmtctx.StatementContext, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, cacheBytes, value []byte) ([][]byte, error) {
|
|
pkIsHandle := tb.PKIsHandle
|
|
values, err := tablecodec.CutRowNew(value, colIDs)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if values == nil {
|
|
values = make([][]byte, len(colIDs))
|
|
}
|
|
// Fill the handle and null columns.
|
|
for _, col := range columns {
|
|
id := col.ID
|
|
offset := colIDs[id]
|
|
if (pkIsHandle && mysql.HasPriKeyFlag(col.Flag)) || id == model.ExtraHandleID {
|
|
var handleDatum types.Datum
|
|
if mysql.HasUnsignedFlag(col.Flag) {
|
|
// PK column is Unsigned.
|
|
handleDatum = types.NewUintDatum(uint64(handle))
|
|
} else {
|
|
handleDatum = types.NewIntDatum(handle)
|
|
}
|
|
handleData, err1 := codec.EncodeValue(ctx, cacheBytes, handleDatum)
|
|
if err1 != nil {
|
|
return nil, errors.Trace(err1)
|
|
}
|
|
values[offset] = handleData
|
|
continue
|
|
}
|
|
if hasColVal(values, colIDs, id) {
|
|
continue
|
|
}
|
|
// no need to fill default value.
|
|
values[offset] = []byte{codec.NilFlag}
|
|
}
|
|
|
|
return values, nil
|
|
}
|
|
|
|
func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool {
|
|
offset, ok := colIDs[id]
|
|
if ok && data[offset] != nil {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
type processKVFunc func(key, value []byte) error
|
|
|
|
func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn processKVFunc) error {
|
|
txn, err := ctx.Txn(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, rg := range kvRanges {
|
|
iter, err := txn.GetMemBuffer().Iter(rg.StartKey, rg.EndKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for ; iter.Valid(); err = iter.Next() {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// check whether the key was been deleted.
|
|
if len(iter.Value()) == 0 {
|
|
continue
|
|
}
|
|
err = fn(iter.Key(), iter.Value())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func reverseDatumSlice(rows [][]types.Datum) {
|
|
for i, j := 0, len(rows)-1; i < j; i, j = i+1, j-1 {
|
|
rows[i], rows[j] = rows[j], rows[i]
|
|
}
|
|
}
|