executor: union scan refactor, optimize the membuffer decoding process (#45051)

ref pingcap/tidb#43249
This commit is contained in:
tiancaiamao
2023-07-06 12:41:15 +08:00
committed by GitHub
parent 24e3c28785
commit fbbd9b0d32
3 changed files with 49 additions and 7 deletions

View File

@ -5115,6 +5115,13 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}
}
defVal := func(i int, chk *chunk.Chunk) error {
if reqCols[i].ID < 0 {
// model.ExtraHandleID, ExtraPidColID, ExtraPhysTblID... etc
// Don't set the default value for that column.
chk.AppendNull(i)
return nil
}
ci := getColInfoByID(tbl, reqCols[i].ID)
d, err := table.GetColOriginDefaultValue(ctx, ci)
if err != nil {

View File

@ -199,6 +199,7 @@ type allocBuf struct {
// cache for decode handle.
handleBytes []byte
rd *rowcodec.BytesDecoder
cd *rowcodec.ChunkDecoder
}
func buildMemTableReader(ctx context.Context, us *UnionScanExec, kvRanges []kv.KeyRange) *memTableReader {
@ -230,6 +231,7 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, kvRanges []kv.K
}
return tablecodec.EncodeValue(us.Ctx().GetSessionVars().StmtCtx, nil, d)
}
cd := NewRowDecoder(us.Ctx(), us.Schema(), us.table.Meta())
rd := rowcodec.NewByteDecoder(colInfo, pkColIDs, defVal, us.Ctx().GetSessionVars().Location())
return &memTableReader{
ctx: us.Ctx(),
@ -242,6 +244,7 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, kvRanges []kv.K
buffer: allocBuf{
handleBytes: make([]byte, 0, 16),
rd: rd,
cd: cd,
},
pkColIDs: pkColIDs,
cacheTable: us.cacheTable,
@ -255,6 +258,10 @@ type txnMemBufferIter struct {
txn kv.Transaction
idx int
curr kv.Iterator
cd *rowcodec.ChunkDecoder
chk *chunk.Chunk
datumRow []types.Datum
}
func (iter *txnMemBufferIter) Next() ([]types.Datum, error) {
@ -299,22 +306,47 @@ func (iter *txnMemBufferIter) next() ([]types.Datum, error) {
continue
}
mutableRow := chunk.MutRowFromTypes(iter.retFieldTypes)
resultRows := make([]types.Datum, len(iter.columns))
resultRows, err = iter.decodeRecordKeyValue(curr.Key(), curr.Value(), &resultRows)
handle, err := tablecodec.DecodeRowKey(curr.Key())
if err != nil {
return nil, errors.Trace(err)
}
iter.chk.Reset()
if !rowcodec.IsNewFormat(curr.Value()) {
// TODO: remove the legacy code!
// fallback to the old way.
iter.datumRow, err = iter.decodeRecordKeyValue(curr.Key(), curr.Value(), &iter.datumRow)
if err != nil {
return nil, errors.Trace(err)
}
mutableRow := chunk.MutRowFromTypes(iter.retFieldTypes)
mutableRow.SetDatums(iter.datumRow...)
matched, _, err := expression.EvalBool(iter.ctx, iter.conditions, mutableRow.ToRow())
if err != nil {
return nil, errors.Trace(err)
}
if !matched {
continue
}
return iter.datumRow, curr.Next()
}
err = iter.cd.DecodeToChunk(curr.Value(), handle, iter.chk)
if err != nil {
return nil, errors.Trace(err)
}
mutableRow.SetDatums(resultRows...)
matched, _, err := expression.EvalBool(iter.ctx, iter.conditions, mutableRow.ToRow())
row := iter.chk.GetRow(0)
matched, _, err := expression.EvalBool(iter.ctx, iter.conditions, row)
if err != nil {
return nil, errors.Trace(err)
}
if !matched {
continue
}
return resultRows, curr.Next()
ret := row.GetDatumRowWithBuffer(iter.retFieldTypes, iter.datumRow)
return ret, curr.Next()
}
return nil, err
}
@ -341,6 +373,9 @@ func (m *memTableReader) getMemRowsIter(ctx context.Context) (memRowsIter, error
return &txnMemBufferIter{
memTableReader: m,
txn: txn,
cd: m.buffer.cd,
chk: chunk.New(m.retFieldTypes, 1, 1),
datumRow: make([]types.Datum, len(m.retFieldTypes)),
}, nil
}

View File

@ -629,7 +629,7 @@ func getColDefaultValueFromNil(ctx sessionctx.Context, col *model.ColumnInfo, ar
sc.AppendWarning(ErrColumnCantNull.FastGenByArgs(col.Name))
return GetZeroValue(col), nil
}
return types.Datum{}, ErrNoDefaultValue.FastGenByArgs(col.Name)
return types.Datum{}, ErrNoDefaultValue.GenWithStackByArgs(col.Name)
}
// GetZeroValue gets zero value for given column type.