diff --git a/executor/builder.go b/executor/builder.go index cfb0dc9a4d..75dda4b180 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -96,6 +96,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor { return b.buildMergeJoin(v) case *plan.PhysicalHashSemiJoin: return b.buildSemiJoin(v) + case *plan.PhysicalIndexJoin: + return b.buildIndexJoin(v) case *plan.Selection: return b.buildSelection(v) case *plan.PhysicalAggregation: @@ -890,6 +892,20 @@ func (b *executorBuilder) constructDAGReq(plans []plan.PhysicalPlan) *tipb.DAGRe return dagReq } +func (b *executorBuilder) buildIndexJoin(v *plan.PhysicalIndexJoin) Executor { + return &IndexLookUpJoin{ + baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])), + innerExec: b.build(v.Children()[1]).(DataReader), + outerJoinKeys: v.OuterJoinKeys, + innerJoinKeys: v.InnerJoinKeys, + outer: v.Outer, + leftConditions: v.LeftConditions, + rightConditions: v.RightConditions, + otherConditions: v.OtherConditions, + defaultValues: v.DefaultValues, + } +} + func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) Executor { dagReq := b.constructDAGReq(v.TablePlans) if b.err != nil { diff --git a/executor/distsql.go b/executor/distsql.go index a16f42b90f..e6625d3cac 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -156,6 +156,21 @@ func tableHandlesToKVRanges(tid int64, handles []int64) []kv.KeyRange { return krs } +// indexValuesToKVRanges will convert the index datums to kv ranges. +func indexValuesToKVRanges(tid, idxID int64, values [][]types.Datum) ([]kv.KeyRange, error) { + krs := make([]kv.KeyRange, 0, len(values)) + for _, vals := range values { + // TODO: We don't process the case that equal key has different types. + valKey, err := codec.EncodeKey(nil, vals...) + if err != nil { + return nil, errors.Trace(err) + } + rangeKey := tablecodec.EncodeIndexSeekKey(tid, idxID, valKey) + krs = append(krs, kv.KeyRange{StartKey: rangeKey, EndKey: rangeKey}) + } + return krs, nil +} + func indexRangesToKVRanges(sc *variable.StatementContext, tid, idxID int64, ranges []*types.IndexRange, fieldTypes []*types.FieldType) ([]kv.KeyRange, error) { krs := make([]kv.KeyRange, 0, len(ranges)) for _, ran := range ranges { diff --git a/executor/join_test.go b/executor/join_test.go index 10ee72a01d..ff1ebe6e50 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -215,19 +215,21 @@ func (s *testSuite) TestJoin(c *C) { result.Sort().Check(testkit.Rows("1 2 1 0")) tk.MustExec("drop table if exists t, t1") tk.MustExec("create table t(a int primary key, b int)") - tk.MustExec("create table t1(a int, b int)") + tk.MustExec("create table t1(a int, b int, key s(b))") tk.MustExec("insert into t values(1, 1), (2, 2), (3, 3)") tk.MustExec("insert into t1 values(1, 2), (1, 3), (3, 4), (4, 5)") - // TODO: Open this after refactoring index join. - // The physical plans of the two sql are tested at physical_plan_test.go - //tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4")) - //tk.MustQuery("select /*+ TIDB_INLJ(t1) */ * from t1 join t on t.a=t1.a and t.a < t1.b").Check(testkit.Rows("1 2 1 1", "1 3 1 1", "3 4 3 3")) - //tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4", " 4 5")) - //tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ avg(t.b) from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1.6667")) - // Test that two conflict hints will return error - //_, err = tk.Exec("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a") - //c.Assert(err, NotNil) + // The physical plans of the two sql are tested at physical_plan_test.go + tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4")) + tk.MustQuery("select /*+ TIDB_INLJ(t1) */ * from t1 join t on t.a=t1.a and t.a < t1.b").Check(testkit.Rows("1 2 1 1", "1 3 1 1", "3 4 3 3")) + // Test single index reader. + tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ t1.b from t1 join t on t.b=t1.b").Check(testkit.Rows("2", "3")) + tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4", " 4 5")) + tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ avg(t.b) from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1.6667")) + + // Test that two conflict hints will return error. + _, err = tk.Exec("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a") + c.Assert(err, NotNil) } diff --git a/executor/new_distsql.go b/executor/new_distsql.go index ac50a1a141..d96d19056c 100644 --- a/executor/new_distsql.go +++ b/executor/new_distsql.go @@ -32,6 +32,13 @@ var ( _ Executor = &IndexLookUpExecutor{} ) +// DataReader can send requests which ranges are constructed by datums. +type DataReader interface { + Executor + + doRequestForDatums(datums [][]types.Datum, goCtx goctx.Context) error +} + // TableReaderExecutor sends dag request and reads table data from kv layer. type TableReaderExecutor struct { asName *model.CIStr @@ -127,6 +134,16 @@ func (e *TableReaderExecutor) doRequestForHandles(handles []int64, goCtx goctx.C return nil } +// doRequestForDatums constructs kv ranges by Datums. It is used by index look up executor. +// Every lens for `datums` will always be one and must be type of int64. +func (e *TableReaderExecutor) doRequestForDatums(datums [][]types.Datum, goCtx goctx.Context) error { + handles := make([]int64, 0, len(datums)) + for _, datum := range datums { + handles = append(handles, datum[0].GetInt64()) + } + return errors.Trace(e.doRequestForHandles(handles, goCtx)) +} + // IndexReaderExecutor sends dag request and reads index data from kv layer. type IndexReaderExecutor struct { asName *model.CIStr @@ -217,6 +234,20 @@ func (e *IndexReaderExecutor) Open() error { return nil } +// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor. +func (e *IndexReaderExecutor) doRequestForDatums(values [][]types.Datum, goCtx goctx.Context) error { + kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values) + if err != nil { + return errors.Trace(err) + } + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + if err != nil { + return errors.Trace(err) + } + e.result.Fetch(goCtx) + return nil +} + // IndexLookUpExecutor implements double read for index scan. type IndexLookUpExecutor struct { asName *model.CIStr @@ -266,6 +297,20 @@ func (e *IndexLookUpExecutor) Open() error { return nil } +// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor. +func (e *IndexLookUpExecutor) doRequestForDatums(values [][]types.Datum, goCtx goctx.Context) error { + kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values) + if err != nil { + return errors.Trace(err) + } + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + if err != nil { + return errors.Trace(err) + } + e.result.Fetch(goCtx) + return nil +} + // executeTask executes the table look up tasks. We will construct a table reader and send request by handles. // Then we hold the returning rows and finish this task. func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Context) { diff --git a/executor/new_join.go b/executor/new_join.go new file mode 100644 index 0000000000..849dbec0ce --- /dev/null +++ b/executor/new_join.go @@ -0,0 +1,253 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "bytes" + "sort" + + "github.com/juju/errors" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/types" +) + +type orderedRow struct { + key []byte + row *Row +} + +type orderedRows []orderedRow + +// Len returns the number of rows. +func (e orderedRows) Len() int { + return len(e) +} + +// Swap implements sort.Interface Swap interface. +func (e orderedRows) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} + +// Less implements sort.Interface Less interface. +func (e orderedRows) Less(i, j int) bool { + return bytes.Compare(e[i].key, e[j].key) < 0 +} + +// IndexLookUpJoin fetches batches of data from outer executor and constructs ranges for inner executor. +type IndexLookUpJoin struct { + baseExecutor + + innerExec DataReader + + cursor int + resultRows []*Row + outerRows orderedRows + innerRows orderedRows + innerDatums orderedRows // innerDatums are extracted by innerRows and innerJoinKeys + exhausted bool // exhausted means whether all data has been extracted + + outerJoinKeys []*expression.Column + innerJoinKeys []*expression.Column + leftConditions expression.CNFExprs + rightConditions expression.CNFExprs + otherConditions expression.CNFExprs + defaultValues []types.Datum + outer bool +} + +// Open implements the Executor Open interface. +func (e *IndexLookUpJoin) Open() error { + e.cursor = 0 + e.exhausted = false + return errors.Trace(e.children[0].Open()) +} + +// Close implements the Executor Close interface. +func (e *IndexLookUpJoin) Close() error { + e.resultRows = nil + e.outerRows = nil + e.innerDatums = nil + return errors.Trace(e.children[0].Close()) +} + +// Next implements the Executor Next interface. +// We will fetch batches of row from outer executor, construct the inner datums and sort them. +// At the same time we will fetch the inner row by the inner datums and apply merge join. +func (e *IndexLookUpJoin) Next() (*Row, error) { + for e.cursor == len(e.resultRows) { + if e.exhausted { + return nil, nil + } + batchSize := e.ctx.GetSessionVars().IndexLookupSize + e.outerRows = e.outerRows[:0] + e.resultRows = e.resultRows[:0] + e.innerDatums = e.innerDatums[:0] + for i := 0; i < batchSize; i++ { + outerRow, err := e.children[0].Next() + if err != nil { + return nil, errors.Trace(err) + } + if outerRow == nil { + e.exhausted = true + break + } + match, err := expression.EvalBool(e.leftConditions, outerRow.Data, e.ctx) + if err != nil { + return nil, errors.Trace(err) + } + if match { + joinDatums := make([]types.Datum, 0, len(e.outerJoinKeys)) + for _, col := range e.outerJoinKeys { + datum, _ := col.Eval(outerRow.Data) + joinDatums = append(joinDatums, datum) + } + joinOuterEncodeKey, err := codec.EncodeValue(nil, joinDatums...) + if err != nil { + return nil, errors.Trace(err) + } + e.outerRows = append(e.outerRows, orderedRow{key: joinOuterEncodeKey, row: outerRow}) + e.innerDatums = append(e.innerDatums, orderedRow{key: joinOuterEncodeKey, row: &Row{Data: joinDatums}}) + } else if e.outer { + e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow)) + } + } + sort.Sort(e.outerRows) + err := e.doJoin() + if err != nil { + return nil, errors.Trace(err) + } + e.cursor = 0 + } + row := e.resultRows[e.cursor] + e.cursor++ + return row, nil +} + +func (e *IndexLookUpJoin) fillDefaultValues(row *Row) *Row { + row.Data = append(row.Data, e.defaultValues...) + return row +} + +func getUniqueDatums(rows orderedRows) [][]types.Datum { + datums := make([][]types.Datum, 0, rows.Len()) + sort.Sort(rows) + for i := range rows { + if i > 0 && bytes.Compare(rows[i-1].key, rows[i].key) == 0 { + continue + } + datums = append(datums, rows[i].row.Data) + } + return datums +} + +// doJoin will join the outer rows and inner rows and store them to resultRows. +func (e *IndexLookUpJoin) doJoin() error { + err := e.innerExec.doRequestForDatums(getUniqueDatums(e.innerDatums), e.ctx.GoCtx()) + if err != nil { + return errors.Trace(err) + } + defer e.innerExec.Close() + for { + innerRow, err := e.innerExec.Next() + if err != nil { + return errors.Trace(err) + } + if innerRow == nil { + break + } + match, err := expression.EvalBool(e.rightConditions, innerRow.Data, e.ctx) + if err != nil { + return errors.Trace(err) + } + if !match { + continue + } + joinDatums := make([]types.Datum, 0, len(e.innerJoinKeys)) + for _, col := range e.innerJoinKeys { + datum, _ := col.Eval(innerRow.Data) + joinDatums = append(joinDatums, datum) + } + joinKey, err := codec.EncodeValue(nil, joinDatums...) + if err != nil { + return errors.Trace(err) + } + e.innerRows = append(e.innerRows, orderedRow{key: joinKey, row: innerRow}) + } + sort.Sort(e.innerRows) + return e.doMergeJoin() +} + +// getNextCursor will move cursor to the next datum that is different from the previous one and return it. +func getNextCursor(cursor int, rows orderedRows) int { + for { + cursor++ + if cursor >= len(rows) { + break + } + c := bytes.Compare(rows[cursor].key, rows[cursor-1].key) + if c != 0 { + break + } + } + return cursor +} + +// doMergeJoin joins the innerRows and outerRows which have been sorted before. +func (e *IndexLookUpJoin) doMergeJoin() error { + var outerCursor, innerCursor int + for outerCursor < len(e.outerRows) && innerCursor < len(e.innerRows) { + c := bytes.Compare(e.outerRows[outerCursor].key, e.innerRows[innerCursor].key) + if c == 0 { + outerBeginCursor := outerCursor + outerEndCursor := getNextCursor(outerCursor, e.outerRows) + innerBeginCursor := innerCursor + innerEndCursor := getNextCursor(innerCursor, e.innerRows) + for i := outerBeginCursor; i < outerEndCursor; i++ { + var outerMatch bool + outerRow := e.outerRows[i].row + for j := innerBeginCursor; j < innerEndCursor; j++ { + innerRow := e.innerRows[j].row + joinedRow := makeJoinRow(outerRow, innerRow) + match, err := expression.EvalBool(e.otherConditions, joinedRow.Data, e.ctx) + if err != nil { + return errors.Trace(err) + } + if match { + outerMatch = true + e.resultRows = append(e.resultRows, joinedRow) + } + } + if e.outer && !outerMatch { + e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow)) + } + } + outerCursor, innerCursor = outerEndCursor, innerEndCursor + } else if c < 0 { // outer smaller then inner, move and enlarge outer cursor + outerCursor = getNextCursor(outerCursor, e.outerRows) + outerRow := e.outerRows[outerCursor].row + if e.outer { + e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow)) + } + } else { + innerCursor = getNextCursor(innerCursor, e.outerRows) + } + } + for e.outer && outerCursor < len(e.outerRows) { + outerRow := e.outerRows[outerCursor].row + e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow)) + outerCursor++ + } + return nil +} diff --git a/plan/dag_plan_test.go b/plan/dag_plan_test.go index 811a8b7404..018b369f0a 100644 --- a/plan/dag_plan_test.go +++ b/plan/dag_plan_test.go @@ -256,7 +256,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) { // Test Index Join + Order by. { sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a, t2.a from t t1, t t2 where t1.a = t2.a order by t1.c", - best: "IndexJoin{IndexReader(Index(t.c_d_e)[[,+inf]])->TableReader(Table(t))}(t1.a,t2.a)->Projection->Projection", + best: "IndexJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->Projection->Sort->Projection", }, // Test Index Join + Order by. { diff --git a/plan/new_physical_plan_builder.go b/plan/new_physical_plan_builder.go index aa57f8c207..78a3b86c02 100644 --- a/plan/new_physical_plan_builder.go +++ b/plan/new_physical_plan_builder.go @@ -148,7 +148,6 @@ func joinKeysMatchIndex(keys []*expression.Column, index *model.IndexInfo) []int func (p *LogicalJoin) convertToIndexJoin(prop *requiredProp, outerIdx int) (taskProfile, error) { outerChild := p.children[outerIdx].(LogicalPlan) innerChild := p.children[1-outerIdx].(LogicalPlan) - canPassProp := len(outerChild.Schema().ColumnsIndices(prop.cols)) > 0 var ( outerTask taskProfile useTableScan bool @@ -160,11 +159,7 @@ func (p *LogicalJoin) convertToIndexJoin(prop *requiredProp, outerIdx int) (task innerJoinKeys = make([]*expression.Column, 0, len(p.EqualConditions)) outerJoinKeys = make([]*expression.Column, 0, len(p.EqualConditions)) ) - if canPassProp { - outerTask, err = outerChild.convert2NewPhysicalPlan(prop) - } else { - outerTask, err = outerChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType}) - } + outerTask, err = outerChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType}) if err != nil { return nil, errors.Trace(err) } @@ -234,39 +229,45 @@ func (p *LogicalJoin) convertToIndexJoin(prop *requiredProp, outerIdx int) (task LeftConditions: leftConds, RightConditions: rightConds, OtherConditions: p.OtherConditions, - outer: p.JoinType != InnerJoin, - outerJoinKeys: outerJoinKeys, - innerJoinKeys: innerJoinKeys, + Outer: p.JoinType != InnerJoin, + OuterJoinKeys: outerJoinKeys, + InnerJoinKeys: innerJoinKeys, + DefaultValues: p.DefaultValues, }.init(p.allocator, p.ctx, p.children[outerIdx], p.children[1-outerIdx]) task := join.attach2TaskProfile(outerTask, innerTask) - if !canPassProp { - task = prop.enforceProperty(task, p.ctx, p.allocator) - } + task = prop.enforceProperty(task, p.ctx, p.allocator) return task, nil } // tryToGetIndexJoin tries to get index join plan. If fails, it returns nil. // Currently we only check by hint. If we prefer the left index join but the join type is right outer, it will fail to return. -func (p *LogicalJoin) tryToGetIndexJoin(prop *requiredProp) (taskProfile, error) { +func (p *LogicalJoin) tryToGetIndexJoin(prop *requiredProp) (bestTask taskProfile, err error) { if len(p.EqualConditions) == 0 { return nil, nil } leftOuter := (p.preferINLJ & preferLeftAsOuter) > 0 if leftOuter { - if p.JoinType == RightOuterJoin { - return nil, nil + if p.JoinType != RightOuterJoin { + bestTask, err = p.convertToIndexJoin(prop, 0) + if err != nil { + return nil, errors.Trace(err) + } } - return p.convertToIndexJoin(prop, 0) } rightOuter := (p.preferINLJ & preferRightAsOuter) > 0 if rightOuter { - if p.JoinType == LeftOuterJoin { - return nil, nil + if p.JoinType != LeftOuterJoin { + task, err := p.convertToIndexJoin(prop, 1) + if err != nil { + return nil, errors.Trace(err) + } + if bestTask == nil || bestTask.cost() > task.cost() { + bestTask = task + } } - return p.convertToIndexJoin(prop, 1) } - return nil, nil + return } // convert2NewPhysicalPlan implements PhysicalPlan interface. diff --git a/plan/physical_plans.go b/plan/physical_plans.go index 95325eaf5b..5970c8af12 100644 --- a/plan/physical_plans.go +++ b/plan/physical_plans.go @@ -470,9 +470,9 @@ type PhysicalIndexJoin struct { *basePlan basePhysicalPlan - outer bool - outerJoinKeys []*expression.Column - innerJoinKeys []*expression.Column + Outer bool + OuterJoinKeys []*expression.Column + InnerJoinKeys []*expression.Column LeftConditions expression.CNFExprs RightConditions expression.CNFExprs OtherConditions expression.CNFExprs diff --git a/plan/resolve_indices.go b/plan/resolve_indices.go index 3cffc82655..baae379efd 100644 --- a/plan/resolve_indices.go +++ b/plan/resolve_indices.go @@ -105,6 +105,26 @@ func (p *PhysicalMergeJoin) ResolveIndices() { } } +// ResolveIndices implements Plan interface. +func (p *PhysicalIndexJoin) ResolveIndices() { + p.basePlan.ResolveIndices() + lSchema := p.children[0].Schema() + rSchema := p.children[1].Schema() + for i := range p.InnerJoinKeys { + p.OuterJoinKeys[i].ResolveIndices(lSchema) + p.InnerJoinKeys[i].ResolveIndices(rSchema) + } + for _, expr := range p.LeftConditions { + expr.ResolveIndices(lSchema) + } + for _, expr := range p.RightConditions { + expr.ResolveIndices(rSchema) + } + for _, expr := range p.OtherConditions { + expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema)) + } +} + // ResolveIndices implements Plan interface. func (p *PhysicalUnionScan) ResolveIndices() { for _, expr := range p.Conditions { diff --git a/plan/stringer.go b/plan/stringer.go index 0969a4e3ad..fb97693d82 100644 --- a/plan/stringer.go +++ b/plan/stringer.go @@ -172,9 +172,9 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) { strs = strs[:idx] idxs = idxs[:last] str = "IndexJoin{" + strings.Join(children, "->") + "}" - for i := range x.outerJoinKeys { - l := x.outerJoinKeys[i] - r := x.innerJoinKeys[i] + for i := range x.OuterJoinKeys { + l := x.OuterJoinKeys[i] + r := x.InnerJoinKeys[i] str += fmt.Sprintf("(%s,%s)", l, r) } default: