plans: Implement next for simple table selection.
This commit is contained in:
@ -52,13 +52,13 @@ type Plan interface {
|
||||
|
||||
// NextPlan is the interface for plans that has implemented next.
|
||||
type NextPlan interface {
|
||||
ImplementedNext() bool
|
||||
UseNext() bool
|
||||
}
|
||||
|
||||
// ImplementedNext checks if p has implemented Next method.
|
||||
func ImplementedNext(p Plan) bool {
|
||||
// UseNext checks if p has implemented Next method.
|
||||
func UseNext(p Plan) bool {
|
||||
if np, ok := p.(NextPlan); ok {
|
||||
return np.ImplementedNext()
|
||||
return np.UseNext()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -95,7 +95,7 @@ func (r *ExplainDefaultPlan) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ImplementedNext implements NextPlan interface.
|
||||
func (r *ExplainDefaultPlan) ImplementedNext() bool {
|
||||
// UseNext implements NextPlan interface
|
||||
func (r *ExplainDefaultPlan) UseNext() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package plans
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/pingcap/tidb/context"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/expression/expressions"
|
||||
@ -36,7 +37,8 @@ var (
|
||||
// SelectFieldsDefaultPlan extracts specific fields from Src Plan.
|
||||
type SelectFieldsDefaultPlan struct {
|
||||
*SelectList
|
||||
Src plan.Plan
|
||||
Src plan.Plan
|
||||
evalArgs map[interface{}]interface{}
|
||||
}
|
||||
|
||||
// Explain implements the plan.Plan Explain interface.
|
||||
@ -78,17 +80,42 @@ func (r *SelectFieldsDefaultPlan) Do(ctx context.Context, f plan.RowIterFunc) er
|
||||
|
||||
// Next implements plan.Plan Next interface.
|
||||
func (r *SelectFieldsDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error) {
|
||||
if r.evalArgs == nil {
|
||||
r.evalArgs = map[interface{}]interface{}{}
|
||||
}
|
||||
srcRow, err := r.Src.Next(ctx)
|
||||
if err != nil || srcRow == nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
r.evalArgs[expressions.ExprEvalIdentFunc] = func(name string) (interface{}, error) {
|
||||
return getIdentValue(name, r.Src.GetFields(), srcRow.Data, field.DefaultFieldFlag)
|
||||
}
|
||||
row = &plan.Row{
|
||||
Data: make([]interface{}, len(r.Fields)),
|
||||
}
|
||||
for i, fld := range r.Fields {
|
||||
var err error
|
||||
if row.Data[i], err = fld.Expr.Eval(ctx, r.evalArgs); err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close implements plan.Plan Close interface.
|
||||
func (r *SelectFieldsDefaultPlan) Close() error {
|
||||
return nil
|
||||
return r.Src.Close()
|
||||
}
|
||||
|
||||
// UseNext implements NextPlan interface
|
||||
func (r *SelectFieldsDefaultPlan) UseNext() bool {
|
||||
return plan.UseNext(r.Src)
|
||||
}
|
||||
|
||||
// SelectEmptyFieldListPlan is the plan for "select expr, expr, ..."" with no FROM.
|
||||
type SelectEmptyFieldListPlan struct {
|
||||
Fields []*field.Field
|
||||
done bool
|
||||
}
|
||||
|
||||
// Do implements the plan.Plan Do interface, returns empty row.
|
||||
@ -128,6 +155,13 @@ func (s *SelectEmptyFieldListPlan) Filter(ctx context.Context, expr expression.E
|
||||
|
||||
// Next implements plan.Plan Next interface.
|
||||
func (s *SelectEmptyFieldListPlan) Next(ctx context.Context) (row *plan.Row, err error) {
|
||||
if s.done {
|
||||
return
|
||||
}
|
||||
row = &plan.Row{
|
||||
Data: make([]interface{}, len(s.Fields)),
|
||||
}
|
||||
s.done = true
|
||||
return
|
||||
}
|
||||
|
||||
@ -135,3 +169,8 @@ func (s *SelectEmptyFieldListPlan) Next(ctx context.Context) (row *plan.Row, err
|
||||
func (s *SelectEmptyFieldListPlan) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UseNext implements NextPlan interface.
|
||||
func (s *SelectEmptyFieldListPlan) UseNext() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/pingcap/tidb/field"
|
||||
"github.com/pingcap/tidb/plan"
|
||||
"github.com/pingcap/tidb/util/format"
|
||||
"github.com/reborndb/go/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -81,10 +82,35 @@ func (r *SelectFinalPlan) Filter(ctx context.Context, expr expression.Expression
|
||||
|
||||
// Next implements plan.Plan Next interface.
|
||||
func (r *SelectFinalPlan) Next(ctx context.Context) (row *plan.Row, err error) {
|
||||
row, err = r.Src.Next(ctx)
|
||||
if row == nil || err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
row.Data = row.Data[:r.HiddenFieldOffset]
|
||||
for i, o := range row.Data {
|
||||
switch v := o.(type) {
|
||||
case bool:
|
||||
// Convert bool field to int
|
||||
if v {
|
||||
row.Data[i] = uint8(1)
|
||||
} else {
|
||||
row.Data[i] = uint8(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !r.infered {
|
||||
setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], row.Data)
|
||||
r.infered = true
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close implements plan.Plan Close interface.
|
||||
func (r *SelectFinalPlan) Close() error {
|
||||
return nil
|
||||
return r.Src.Close()
|
||||
}
|
||||
|
||||
// UseNext implements NextPlan interface
|
||||
func (r *SelectFinalPlan) UseNext() bool {
|
||||
return plan.UseNext(r.Src)
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/tidb/column"
|
||||
"github.com/pingcap/tidb/context"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
@ -110,6 +111,7 @@ func (r *TableNilPlan) Close() error {
|
||||
type TableDefaultPlan struct {
|
||||
T table.Table
|
||||
Fields []*field.ResultField
|
||||
iter kv.Iterator
|
||||
}
|
||||
|
||||
// Explain implements the plan.Plan Explain interface.
|
||||
@ -328,10 +330,64 @@ func (r *TableDefaultPlan) GetFields() []*field.ResultField {
|
||||
|
||||
// Next implements plan.Plan Next interface.
|
||||
func (r *TableDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error) {
|
||||
if r.iter == nil {
|
||||
var txn kv.Transaction
|
||||
txn, err = ctx.GetTxn(false)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
r.iter, err = txn.Seek([]byte(r.T.FirstKey()), nil)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
if !r.iter.Valid() || !strings.HasPrefix(r.iter.Key(), r.T.KeyPrefix()) {
|
||||
return
|
||||
}
|
||||
// TODO: check if lock valid
|
||||
// the record layout in storage (key -> value):
|
||||
// r1 -> lock-version
|
||||
// r1_col1 -> r1 col1 value
|
||||
// r1_col2 -> r1 col2 value
|
||||
// r2 -> lock-version
|
||||
// r2_col1 -> r2 col1 value
|
||||
// r2_col2 -> r2 col2 value
|
||||
// ...
|
||||
rowKey := r.iter.Key()
|
||||
h, err := util.DecodeHandleFromRowKey(rowKey)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// TODO: we could just fetch mentioned columns' values
|
||||
row = &plan.Row{}
|
||||
row.Data, err = r.T.Row(ctx, h)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
// Put rowKey to the tail of record row
|
||||
rke := &plan.RowKeyEntry{
|
||||
Tbl: r.T,
|
||||
Key: rowKey,
|
||||
}
|
||||
row.RowKeys = append(row.RowKeys, rke)
|
||||
|
||||
rk := r.T.RecordKey(h, nil)
|
||||
r.iter, err = kv.NextUntil(r.iter, util.RowKeyPrefixFilter(rk))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close implements plan.Plan Close interface.
|
||||
func (r *TableDefaultPlan) Close() error {
|
||||
r.iter.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// UseNext implements NextPlan interface
|
||||
func (r *TableDefaultPlan) UseNext() bool {
|
||||
log.Warn("use next")
|
||||
return true
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
mysql "github.com/pingcap/tidb/mysqldef"
|
||||
"github.com/pingcap/tidb/parser/opcode"
|
||||
"github.com/pingcap/tidb/plan/plans"
|
||||
"github.com/pingcap/tidb/rset/rsets"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/table/tables"
|
||||
@ -125,7 +126,8 @@ func (p *testFromSuit) TestTableDefaultPlan(c *C) {
|
||||
}
|
||||
|
||||
ret := map[int64]string{}
|
||||
pln.Do(p, func(id interface{}, data []interface{}) (bool, error) {
|
||||
rset := rsets.Recordset{Ctx: p, Plan: pln}
|
||||
rset.Do(func(data []interface{}) (bool, error) {
|
||||
ret[data[0].(int64)] = data[1].(string)
|
||||
return true, nil
|
||||
})
|
||||
|
||||
@ -345,10 +345,15 @@ func appendRow(prefix []interface{}, in []interface{}) []interface{} {
|
||||
|
||||
// Next implements plan.Plan Next interface.
|
||||
func (r *JoinPlan) Next(ctx context.Context) (row *plan.Row, err error) {
|
||||
return
|
||||
return r.Left.Next(ctx)
|
||||
}
|
||||
|
||||
// Close implements plan.Plan Close interface.
|
||||
func (r *JoinPlan) Close() error {
|
||||
return nil
|
||||
return r.Left.Close()
|
||||
}
|
||||
|
||||
// UseNext implements NextPlan interface
|
||||
func (r *JoinPlan) UseNext() bool {
|
||||
return r.Right == nil && plan.UseNext(r.Left)
|
||||
}
|
||||
|
||||
@ -88,10 +88,32 @@ func (r *SelectLockPlan) Filter(ctx context.Context, expr expression.Expression)
|
||||
|
||||
// Next implements plan.Plan Next interface.
|
||||
func (r *SelectLockPlan) Next(ctx context.Context) (row *plan.Row, err error) {
|
||||
row, err = r.Src.Next(ctx)
|
||||
if row == nil || err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if len(row.RowKeys) != 0 && r.Lock == coldef.SelectLockForUpdate {
|
||||
forupdate.SetForUpdate(ctx)
|
||||
txn, err := ctx.GetTxn(false)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
for _, k := range row.RowKeys {
|
||||
err = txn.LockKeys([]byte(k.Key))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close implements plan.Plan Close interface.
|
||||
func (r *SelectLockPlan) Close() error {
|
||||
return nil
|
||||
return r.Src.Close()
|
||||
}
|
||||
|
||||
// UseNext implements NextPlan interface
|
||||
func (r *SelectLockPlan) UseNext() bool {
|
||||
return plan.UseNext(r.Src)
|
||||
}
|
||||
|
||||
@ -42,7 +42,7 @@ func (r Recordset) GetFields() []interface{} {
|
||||
|
||||
// Do implements rset.Recordset.
|
||||
func (r Recordset) Do(f func(data []interface{}) (bool, error)) error {
|
||||
if plan.ImplementedNext(r.Plan) {
|
||||
if plan.UseNext(r.Plan) {
|
||||
defer r.Plan.Close()
|
||||
for {
|
||||
row, err := r.Plan.Next(r.Ctx)
|
||||
|
||||
Reference in New Issue
Block a user