executor: improve UnionScanRead performance (#32668)
ref pingcap/tidb#32433
This commit is contained in:
@ -180,6 +180,7 @@ type memTableReader struct {
|
||||
buffer allocBuf
|
||||
pkColIDs []int64
|
||||
cacheTable kv.MemBuffer
|
||||
offsets []int
|
||||
}
|
||||
|
||||
type allocBuf struct {
|
||||
@ -240,18 +241,25 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
|
||||
opentracing.ContextWithSpan(ctx, span1)
|
||||
}
|
||||
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
|
||||
resultRows := make([]types.Datum, len(m.columns))
|
||||
m.offsets = make([]int, len(m.columns))
|
||||
for i, col := range m.columns {
|
||||
m.offsets[i] = m.colIDs[col.ID]
|
||||
}
|
||||
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
|
||||
row, err := m.decodeRecordKeyValue(key, value)
|
||||
var err error
|
||||
resultRows, err = m.decodeRecordKeyValue(key, value, &resultRows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mutableRow.SetDatums(row...)
|
||||
mutableRow.SetDatums(resultRows...)
|
||||
matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow())
|
||||
if err != nil || !matched {
|
||||
return err
|
||||
}
|
||||
m.addedRows = append(m.addedRows, row)
|
||||
m.addedRows = append(m.addedRows, resultRows)
|
||||
resultRows = make([]types.Datum, len(m.columns))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@ -265,30 +273,29 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
|
||||
return m.addedRows, nil
|
||||
}
|
||||
|
||||
func (m *memTableReader) decodeRecordKeyValue(key, value []byte) ([]types.Datum, error) {
|
||||
func (m *memTableReader) decodeRecordKeyValue(key, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) {
|
||||
handle, err := tablecodec.DecodeRowKey(key)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return m.decodeRowData(handle, value)
|
||||
return m.decodeRowData(handle, value, resultRows)
|
||||
}
|
||||
|
||||
// decodeRowData uses to decode row data value.
|
||||
func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte) ([]types.Datum, error) {
|
||||
func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) {
|
||||
values, err := m.getRowData(handle, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ds := make([]types.Datum, 0, len(m.columns))
|
||||
for _, col := range m.columns {
|
||||
offset := m.colIDs[col.ID]
|
||||
d, err := tablecodec.DecodeColumnValue(values[offset], &col.FieldType, m.ctx.GetSessionVars().Location())
|
||||
for i, col := range m.columns {
|
||||
var datum types.Datum
|
||||
err := tablecodec.DecodeColumnValueWithDatum(values[m.offsets[i]], &col.FieldType, m.ctx.GetSessionVars().Location(), &datum)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ds = append(ds, d)
|
||||
(*resultRows)[i] = datum
|
||||
}
|
||||
return ds, nil
|
||||
return *resultRows, nil
|
||||
}
|
||||
|
||||
// getRowData decodes raw byte slice to row data.
|
||||
|
||||
@ -375,6 +375,20 @@ func DecodeColumnValue(data []byte, ft *types.FieldType, loc *time.Location) (ty
|
||||
return colDatum, nil
|
||||
}
|
||||
|
||||
// DecodeColumnValueWithDatum decodes data to an existing Datum according to the column info.
|
||||
func DecodeColumnValueWithDatum(data []byte, ft *types.FieldType, loc *time.Location, result *types.Datum) error {
|
||||
var err error
|
||||
_, *result, err = codec.DecodeOne(data)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
*result, err = Unflatten(*result, ft, loc)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DecodeRowWithMapNew decode a row to datum map.
|
||||
func DecodeRowWithMapNew(b []byte, cols map[int64]*types.FieldType,
|
||||
loc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
|
||||
@ -401,7 +415,7 @@ func DecodeRowWithMapNew(b []byte, cols map[int64]*types.FieldType,
|
||||
return rd.DecodeToDatumMap(b, row)
|
||||
}
|
||||
|
||||
// DecodeRowWithMap decodes a byte slice into datums with a existing row map.
|
||||
// DecodeRowWithMap decodes a byte slice into datums with an existing row map.
|
||||
// Row layout: colID1, value1, colID2, value2, .....
|
||||
func DecodeRowWithMap(b []byte, cols map[int64]*types.FieldType, loc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
|
||||
if row == nil {
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
package rowcodec
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -452,7 +453,7 @@ func (decoder *BytesDecoder) tryDecodeHandle(values [][]byte, offset int, col *C
|
||||
return false
|
||||
}
|
||||
|
||||
// DecodeToBytesNoHandle decodes raw byte slice to row dat without handle.
|
||||
// DecodeToBytesNoHandle decodes raw byte slice to row data without handle.
|
||||
func (decoder *BytesDecoder) DecodeToBytesNoHandle(outputOffset map[int64]int, value []byte) ([][]byte, error) {
|
||||
return decoder.decodeToBytesInternal(outputOffset, nil, value, nil)
|
||||
}
|
||||
@ -463,7 +464,7 @@ func (decoder *BytesDecoder) DecodeToBytes(outputOffset map[int64]int, handle kv
|
||||
}
|
||||
|
||||
func (decoder *BytesDecoder) encodeOldDatum(tp byte, val []byte) []byte {
|
||||
var buf []byte
|
||||
buf := make([]byte, 0, 1+binary.MaxVarintLen64+len(val))
|
||||
switch tp {
|
||||
case BytesFlag:
|
||||
buf = append(buf, CompactBytesFlag)
|
||||
|
||||
Reference in New Issue
Block a user