From 4ec288cb57e9eb2cebc29d2d64b5dbc420bfc950 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Sat, 12 Sep 2015 18:05:25 +0800 Subject: [PATCH] plans: Implement next for simple table selection. --- plan/plan.go | 8 +++--- plan/plans/explain.go | 4 +-- plan/plans/fields.go | 43 +++++++++++++++++++++++++++++-- plan/plans/final.go | 28 ++++++++++++++++++++- plan/plans/from.go | 56 +++++++++++++++++++++++++++++++++++++++++ plan/plans/from_test.go | 4 ++- plan/plans/join.go | 9 +++++-- plan/plans/lock.go | 24 +++++++++++++++++- rset/rsets/rsets.go | 2 +- 9 files changed, 164 insertions(+), 14 deletions(-) diff --git a/plan/plan.go b/plan/plan.go index 1d28ddff50..982fdd371e 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -52,13 +52,13 @@ type Plan interface { // NextPlan is the interface for plans that has implemented next. type NextPlan interface { - ImplementedNext() bool + UseNext() bool } -// ImplementedNext checks if p has implemented Next method. -func ImplementedNext(p Plan) bool { +// UseNext checks if p has implemented Next method. +func UseNext(p Plan) bool { if np, ok := p.(NextPlan); ok { - return np.ImplementedNext() + return np.UseNext() } return false } diff --git a/plan/plans/explain.go b/plan/plans/explain.go index 9ae03dfb94..abea078298 100644 --- a/plan/plans/explain.go +++ b/plan/plans/explain.go @@ -95,7 +95,7 @@ func (r *ExplainDefaultPlan) Close() error { return nil } -// ImplementedNext implements NextPlan interface. -func (r *ExplainDefaultPlan) ImplementedNext() bool { +// UseNext implements NextPlan interface +func (r *ExplainDefaultPlan) UseNext() bool { return true } diff --git a/plan/plans/fields.go b/plan/plans/fields.go index 143a96119d..96aa9ca5b6 100644 --- a/plan/plans/fields.go +++ b/plan/plans/fields.go @@ -20,6 +20,7 @@ package plans import ( "fmt" + "github.com/juju/errors" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/expressions" @@ -36,7 +37,8 @@ var ( // SelectFieldsDefaultPlan extracts specific fields from Src Plan. type SelectFieldsDefaultPlan struct { *SelectList - Src plan.Plan + Src plan.Plan + evalArgs map[interface{}]interface{} } // Explain implements the plan.Plan Explain interface. @@ -78,17 +80,42 @@ func (r *SelectFieldsDefaultPlan) Do(ctx context.Context, f plan.RowIterFunc) er // Next implements plan.Plan Next interface. func (r *SelectFieldsDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error) { + if r.evalArgs == nil { + r.evalArgs = map[interface{}]interface{}{} + } + srcRow, err := r.Src.Next(ctx) + if err != nil || srcRow == nil { + return nil, errors.Trace(err) + } + r.evalArgs[expressions.ExprEvalIdentFunc] = func(name string) (interface{}, error) { + return getIdentValue(name, r.Src.GetFields(), srcRow.Data, field.DefaultFieldFlag) + } + row = &plan.Row{ + Data: make([]interface{}, len(r.Fields)), + } + for i, fld := range r.Fields { + var err error + if row.Data[i], err = fld.Expr.Eval(ctx, r.evalArgs); err != nil { + return nil, errors.Trace(err) + } + } return } // Close implements plan.Plan Close interface. func (r *SelectFieldsDefaultPlan) Close() error { - return nil + return r.Src.Close() +} + +// UseNext implements NextPlan interface +func (r *SelectFieldsDefaultPlan) UseNext() bool { + return plan.UseNext(r.Src) } // SelectEmptyFieldListPlan is the plan for "select expr, expr, ..."" with no FROM. type SelectEmptyFieldListPlan struct { Fields []*field.Field + done bool } // Do implements the plan.Plan Do interface, returns empty row. @@ -128,6 +155,13 @@ func (s *SelectEmptyFieldListPlan) Filter(ctx context.Context, expr expression.E // Next implements plan.Plan Next interface. func (s *SelectEmptyFieldListPlan) Next(ctx context.Context) (row *plan.Row, err error) { + if s.done { + return + } + row = &plan.Row{ + Data: make([]interface{}, len(s.Fields)), + } + s.done = true return } @@ -135,3 +169,8 @@ func (s *SelectEmptyFieldListPlan) Next(ctx context.Context) (row *plan.Row, err func (s *SelectEmptyFieldListPlan) Close() error { return nil } + +// UseNext implements NextPlan interface. +func (s *SelectEmptyFieldListPlan) UseNext() bool { + return true +} diff --git a/plan/plans/final.go b/plan/plans/final.go index bc00c8f618..c991803d6a 100644 --- a/plan/plans/final.go +++ b/plan/plans/final.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/field" "github.com/pingcap/tidb/plan" "github.com/pingcap/tidb/util/format" + "github.com/reborndb/go/errors" ) var ( @@ -81,10 +82,35 @@ func (r *SelectFinalPlan) Filter(ctx context.Context, expr expression.Expression // Next implements plan.Plan Next interface. func (r *SelectFinalPlan) Next(ctx context.Context) (row *plan.Row, err error) { + row, err = r.Src.Next(ctx) + if row == nil || err != nil { + return nil, errors.Trace(err) + } + row.Data = row.Data[:r.HiddenFieldOffset] + for i, o := range row.Data { + switch v := o.(type) { + case bool: + // Convert bool field to int + if v { + row.Data[i] = uint8(1) + } else { + row.Data[i] = uint8(0) + } + } + } + if !r.infered { + setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], row.Data) + r.infered = true + } return } // Close implements plan.Plan Close interface. func (r *SelectFinalPlan) Close() error { - return nil + return r.Src.Close() +} + +// UseNext implements NextPlan interface +func (r *SelectFinalPlan) UseNext() bool { + return plan.UseNext(r.Src) } diff --git a/plan/plans/from.go b/plan/plans/from.go index d8cec724ed..2b7031507c 100644 --- a/plan/plans/from.go +++ b/plan/plans/from.go @@ -21,6 +21,7 @@ import ( "strings" "github.com/juju/errors" + "github.com/ngaut/log" "github.com/pingcap/tidb/column" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/expression" @@ -110,6 +111,7 @@ func (r *TableNilPlan) Close() error { type TableDefaultPlan struct { T table.Table Fields []*field.ResultField + iter kv.Iterator } // Explain implements the plan.Plan Explain interface. @@ -328,10 +330,64 @@ func (r *TableDefaultPlan) GetFields() []*field.ResultField { // Next implements plan.Plan Next interface. func (r *TableDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error) { + if r.iter == nil { + var txn kv.Transaction + txn, err = ctx.GetTxn(false) + if err != nil { + return nil, errors.Trace(err) + } + r.iter, err = txn.Seek([]byte(r.T.FirstKey()), nil) + if err != nil { + return nil, errors.Trace(err) + } + } + if !r.iter.Valid() || !strings.HasPrefix(r.iter.Key(), r.T.KeyPrefix()) { + return + } + // TODO: check if lock valid + // the record layout in storage (key -> value): + // r1 -> lock-version + // r1_col1 -> r1 col1 value + // r1_col2 -> r1 col2 value + // r2 -> lock-version + // r2_col1 -> r2 col1 value + // r2_col2 -> r2 col2 value + // ... + rowKey := r.iter.Key() + h, err := util.DecodeHandleFromRowKey(rowKey) + if err != nil { + return nil, errors.Trace(err) + } + + // TODO: we could just fetch mentioned columns' values + row = &plan.Row{} + row.Data, err = r.T.Row(ctx, h) + if err != nil { + return nil, errors.Trace(err) + } + // Put rowKey to the tail of record row + rke := &plan.RowKeyEntry{ + Tbl: r.T, + Key: rowKey, + } + row.RowKeys = append(row.RowKeys, rke) + + rk := r.T.RecordKey(h, nil) + r.iter, err = kv.NextUntil(r.iter, util.RowKeyPrefixFilter(rk)) + if err != nil { + return nil, errors.Trace(err) + } return } // Close implements plan.Plan Close interface. func (r *TableDefaultPlan) Close() error { + r.iter.Close() return nil } + +// UseNext implements NextPlan interface +func (r *TableDefaultPlan) UseNext() bool { + log.Warn("use next") + return true +} diff --git a/plan/plans/from_test.go b/plan/plans/from_test.go index 0f6b77bfe7..7347901522 100644 --- a/plan/plans/from_test.go +++ b/plan/plans/from_test.go @@ -27,6 +27,7 @@ import ( mysql "github.com/pingcap/tidb/mysqldef" "github.com/pingcap/tidb/parser/opcode" "github.com/pingcap/tidb/plan/plans" + "github.com/pingcap/tidb/rset/rsets" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -125,7 +126,8 @@ func (p *testFromSuit) TestTableDefaultPlan(c *C) { } ret := map[int64]string{} - pln.Do(p, func(id interface{}, data []interface{}) (bool, error) { + rset := rsets.Recordset{Ctx: p, Plan: pln} + rset.Do(func(data []interface{}) (bool, error) { ret[data[0].(int64)] = data[1].(string) return true, nil }) diff --git a/plan/plans/join.go b/plan/plans/join.go index d129631491..8717da7471 100644 --- a/plan/plans/join.go +++ b/plan/plans/join.go @@ -345,10 +345,15 @@ func appendRow(prefix []interface{}, in []interface{}) []interface{} { // Next implements plan.Plan Next interface. func (r *JoinPlan) Next(ctx context.Context) (row *plan.Row, err error) { - return + return r.Left.Next(ctx) } // Close implements plan.Plan Close interface. func (r *JoinPlan) Close() error { - return nil + return r.Left.Close() +} + +// UseNext implements NextPlan interface +func (r *JoinPlan) UseNext() bool { + return r.Right == nil && plan.UseNext(r.Left) } diff --git a/plan/plans/lock.go b/plan/plans/lock.go index 1308faf8fe..aa8bdf520d 100644 --- a/plan/plans/lock.go +++ b/plan/plans/lock.go @@ -88,10 +88,32 @@ func (r *SelectLockPlan) Filter(ctx context.Context, expr expression.Expression) // Next implements plan.Plan Next interface. func (r *SelectLockPlan) Next(ctx context.Context) (row *plan.Row, err error) { + row, err = r.Src.Next(ctx) + if row == nil || err != nil { + return nil, errors.Trace(err) + } + if len(row.RowKeys) != 0 && r.Lock == coldef.SelectLockForUpdate { + forupdate.SetForUpdate(ctx) + txn, err := ctx.GetTxn(false) + if err != nil { + return nil, errors.Trace(err) + } + for _, k := range row.RowKeys { + err = txn.LockKeys([]byte(k.Key)) + if err != nil { + return nil, errors.Trace(err) + } + } + } return } // Close implements plan.Plan Close interface. func (r *SelectLockPlan) Close() error { - return nil + return r.Src.Close() +} + +// UseNext implements NextPlan interface +func (r *SelectLockPlan) UseNext() bool { + return plan.UseNext(r.Src) } diff --git a/rset/rsets/rsets.go b/rset/rsets/rsets.go index 3858dff1eb..bf67a4ae76 100644 --- a/rset/rsets/rsets.go +++ b/rset/rsets/rsets.go @@ -42,7 +42,7 @@ func (r Recordset) GetFields() []interface{} { // Do implements rset.Recordset. func (r Recordset) Do(f func(data []interface{}) (bool, error)) error { - if plan.ImplementedNext(r.Plan) { + if plan.UseNext(r.Plan) { defer r.Plan.Close() for { row, err := r.Plan.Next(r.Ctx)