executor, plan: support batch index look up join. (#3306)

This commit is contained in:
Han Fei
2017-05-23 23:12:19 +08:00
committed by Shen Li
parent d2557fe49a
commit 3c7df1c302
10 changed files with 389 additions and 37 deletions

View File

@ -96,6 +96,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildMergeJoin(v)
case *plan.PhysicalHashSemiJoin:
return b.buildSemiJoin(v)
case *plan.PhysicalIndexJoin:
return b.buildIndexJoin(v)
case *plan.Selection:
return b.buildSelection(v)
case *plan.PhysicalAggregation:
@ -890,6 +892,20 @@ func (b *executorBuilder) constructDAGReq(plans []plan.PhysicalPlan) *tipb.DAGRe
return dagReq
}
func (b *executorBuilder) buildIndexJoin(v *plan.PhysicalIndexJoin) Executor {
return &IndexLookUpJoin{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
innerExec: b.build(v.Children()[1]).(DataReader),
outerJoinKeys: v.OuterJoinKeys,
innerJoinKeys: v.InnerJoinKeys,
outer: v.Outer,
leftConditions: v.LeftConditions,
rightConditions: v.RightConditions,
otherConditions: v.OtherConditions,
defaultValues: v.DefaultValues,
}
}
func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) Executor {
dagReq := b.constructDAGReq(v.TablePlans)
if b.err != nil {

View File

@ -156,6 +156,21 @@ func tableHandlesToKVRanges(tid int64, handles []int64) []kv.KeyRange {
return krs
}
// indexValuesToKVRanges will convert the index datums to kv ranges.
func indexValuesToKVRanges(tid, idxID int64, values [][]types.Datum) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(values))
for _, vals := range values {
// TODO: We don't process the case that equal key has different types.
valKey, err := codec.EncodeKey(nil, vals...)
if err != nil {
return nil, errors.Trace(err)
}
rangeKey := tablecodec.EncodeIndexSeekKey(tid, idxID, valKey)
krs = append(krs, kv.KeyRange{StartKey: rangeKey, EndKey: rangeKey})
}
return krs, nil
}
func indexRangesToKVRanges(sc *variable.StatementContext, tid, idxID int64, ranges []*types.IndexRange, fieldTypes []*types.FieldType) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {

View File

@ -215,19 +215,21 @@ func (s *testSuite) TestJoin(c *C) {
result.Sort().Check(testkit.Rows("1 2 1 0"))
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(a int primary key, b int)")
tk.MustExec("create table t1(a int, b int)")
tk.MustExec("create table t1(a int, b int, key s(b))")
tk.MustExec("insert into t values(1, 1), (2, 2), (3, 3)")
tk.MustExec("insert into t1 values(1, 2), (1, 3), (3, 4), (4, 5)")
// TODO: Open this after refactoring index join.
// The physical plans of the two sql are tested at physical_plan_test.go
//tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4"))
//tk.MustQuery("select /*+ TIDB_INLJ(t1) */ * from t1 join t on t.a=t1.a and t.a < t1.b").Check(testkit.Rows("1 2 1 1", "1 3 1 1", "3 4 3 3"))
//tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4", "<nil> <nil> 4 5"))
//tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ avg(t.b) from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1.6667"))
// Test that two conflict hints will return error
//_, err = tk.Exec("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a")
//c.Assert(err, NotNil)
// The physical plans of the two sql are tested at physical_plan_test.go
tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4"))
tk.MustQuery("select /*+ TIDB_INLJ(t1) */ * from t1 join t on t.a=t1.a and t.a < t1.b").Check(testkit.Rows("1 2 1 1", "1 3 1 1", "3 4 3 3"))
// Test single index reader.
tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ t1.b from t1 join t on t.b=t1.b").Check(testkit.Rows("2", "3"))
tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4", "<nil> <nil> 4 5"))
tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ avg(t.b) from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1.6667"))
// Test that two conflict hints will return error.
_, err = tk.Exec("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)
}

View File

@ -32,6 +32,13 @@ var (
_ Executor = &IndexLookUpExecutor{}
)
// DataReader can send requests which ranges are constructed by datums.
type DataReader interface {
Executor
doRequestForDatums(datums [][]types.Datum, goCtx goctx.Context) error
}
// TableReaderExecutor sends dag request and reads table data from kv layer.
type TableReaderExecutor struct {
asName *model.CIStr
@ -127,6 +134,16 @@ func (e *TableReaderExecutor) doRequestForHandles(handles []int64, goCtx goctx.C
return nil
}
// doRequestForDatums constructs kv ranges by Datums. It is used by index look up executor.
// Every lens for `datums` will always be one and must be type of int64.
func (e *TableReaderExecutor) doRequestForDatums(datums [][]types.Datum, goCtx goctx.Context) error {
handles := make([]int64, 0, len(datums))
for _, datum := range datums {
handles = append(handles, datum[0].GetInt64())
}
return errors.Trace(e.doRequestForHandles(handles, goCtx))
}
// IndexReaderExecutor sends dag request and reads index data from kv layer.
type IndexReaderExecutor struct {
asName *model.CIStr
@ -217,6 +234,20 @@ func (e *IndexReaderExecutor) Open() error {
return nil
}
// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor.
func (e *IndexReaderExecutor) doRequestForDatums(values [][]types.Datum, goCtx goctx.Context) error {
kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values)
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(goCtx)
return nil
}
// IndexLookUpExecutor implements double read for index scan.
type IndexLookUpExecutor struct {
asName *model.CIStr
@ -266,6 +297,20 @@ func (e *IndexLookUpExecutor) Open() error {
return nil
}
// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor.
func (e *IndexLookUpExecutor) doRequestForDatums(values [][]types.Datum, goCtx goctx.Context) error {
kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values)
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(goCtx)
return nil
}
// executeTask executes the table look up tasks. We will construct a table reader and send request by handles.
// Then we hold the returning rows and finish this task.
func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Context) {

253
executor/new_join.go Normal file
View File

@ -0,0 +1,253 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"bytes"
"sort"
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
)
type orderedRow struct {
key []byte
row *Row
}
type orderedRows []orderedRow
// Len returns the number of rows.
func (e orderedRows) Len() int {
return len(e)
}
// Swap implements sort.Interface Swap interface.
func (e orderedRows) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
// Less implements sort.Interface Less interface.
func (e orderedRows) Less(i, j int) bool {
return bytes.Compare(e[i].key, e[j].key) < 0
}
// IndexLookUpJoin fetches batches of data from outer executor and constructs ranges for inner executor.
type IndexLookUpJoin struct {
baseExecutor
innerExec DataReader
cursor int
resultRows []*Row
outerRows orderedRows
innerRows orderedRows
innerDatums orderedRows // innerDatums are extracted by innerRows and innerJoinKeys
exhausted bool // exhausted means whether all data has been extracted
outerJoinKeys []*expression.Column
innerJoinKeys []*expression.Column
leftConditions expression.CNFExprs
rightConditions expression.CNFExprs
otherConditions expression.CNFExprs
defaultValues []types.Datum
outer bool
}
// Open implements the Executor Open interface.
func (e *IndexLookUpJoin) Open() error {
e.cursor = 0
e.exhausted = false
return errors.Trace(e.children[0].Open())
}
// Close implements the Executor Close interface.
func (e *IndexLookUpJoin) Close() error {
e.resultRows = nil
e.outerRows = nil
e.innerDatums = nil
return errors.Trace(e.children[0].Close())
}
// Next implements the Executor Next interface.
// We will fetch batches of row from outer executor, construct the inner datums and sort them.
// At the same time we will fetch the inner row by the inner datums and apply merge join.
func (e *IndexLookUpJoin) Next() (*Row, error) {
for e.cursor == len(e.resultRows) {
if e.exhausted {
return nil, nil
}
batchSize := e.ctx.GetSessionVars().IndexLookupSize
e.outerRows = e.outerRows[:0]
e.resultRows = e.resultRows[:0]
e.innerDatums = e.innerDatums[:0]
for i := 0; i < batchSize; i++ {
outerRow, err := e.children[0].Next()
if err != nil {
return nil, errors.Trace(err)
}
if outerRow == nil {
e.exhausted = true
break
}
match, err := expression.EvalBool(e.leftConditions, outerRow.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
if match {
joinDatums := make([]types.Datum, 0, len(e.outerJoinKeys))
for _, col := range e.outerJoinKeys {
datum, _ := col.Eval(outerRow.Data)
joinDatums = append(joinDatums, datum)
}
joinOuterEncodeKey, err := codec.EncodeValue(nil, joinDatums...)
if err != nil {
return nil, errors.Trace(err)
}
e.outerRows = append(e.outerRows, orderedRow{key: joinOuterEncodeKey, row: outerRow})
e.innerDatums = append(e.innerDatums, orderedRow{key: joinOuterEncodeKey, row: &Row{Data: joinDatums}})
} else if e.outer {
e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow))
}
}
sort.Sort(e.outerRows)
err := e.doJoin()
if err != nil {
return nil, errors.Trace(err)
}
e.cursor = 0
}
row := e.resultRows[e.cursor]
e.cursor++
return row, nil
}
func (e *IndexLookUpJoin) fillDefaultValues(row *Row) *Row {
row.Data = append(row.Data, e.defaultValues...)
return row
}
func getUniqueDatums(rows orderedRows) [][]types.Datum {
datums := make([][]types.Datum, 0, rows.Len())
sort.Sort(rows)
for i := range rows {
if i > 0 && bytes.Compare(rows[i-1].key, rows[i].key) == 0 {
continue
}
datums = append(datums, rows[i].row.Data)
}
return datums
}
// doJoin will join the outer rows and inner rows and store them to resultRows.
func (e *IndexLookUpJoin) doJoin() error {
err := e.innerExec.doRequestForDatums(getUniqueDatums(e.innerDatums), e.ctx.GoCtx())
if err != nil {
return errors.Trace(err)
}
defer e.innerExec.Close()
for {
innerRow, err := e.innerExec.Next()
if err != nil {
return errors.Trace(err)
}
if innerRow == nil {
break
}
match, err := expression.EvalBool(e.rightConditions, innerRow.Data, e.ctx)
if err != nil {
return errors.Trace(err)
}
if !match {
continue
}
joinDatums := make([]types.Datum, 0, len(e.innerJoinKeys))
for _, col := range e.innerJoinKeys {
datum, _ := col.Eval(innerRow.Data)
joinDatums = append(joinDatums, datum)
}
joinKey, err := codec.EncodeValue(nil, joinDatums...)
if err != nil {
return errors.Trace(err)
}
e.innerRows = append(e.innerRows, orderedRow{key: joinKey, row: innerRow})
}
sort.Sort(e.innerRows)
return e.doMergeJoin()
}
// getNextCursor will move cursor to the next datum that is different from the previous one and return it.
func getNextCursor(cursor int, rows orderedRows) int {
for {
cursor++
if cursor >= len(rows) {
break
}
c := bytes.Compare(rows[cursor].key, rows[cursor-1].key)
if c != 0 {
break
}
}
return cursor
}
// doMergeJoin joins the innerRows and outerRows which have been sorted before.
func (e *IndexLookUpJoin) doMergeJoin() error {
var outerCursor, innerCursor int
for outerCursor < len(e.outerRows) && innerCursor < len(e.innerRows) {
c := bytes.Compare(e.outerRows[outerCursor].key, e.innerRows[innerCursor].key)
if c == 0 {
outerBeginCursor := outerCursor
outerEndCursor := getNextCursor(outerCursor, e.outerRows)
innerBeginCursor := innerCursor
innerEndCursor := getNextCursor(innerCursor, e.innerRows)
for i := outerBeginCursor; i < outerEndCursor; i++ {
var outerMatch bool
outerRow := e.outerRows[i].row
for j := innerBeginCursor; j < innerEndCursor; j++ {
innerRow := e.innerRows[j].row
joinedRow := makeJoinRow(outerRow, innerRow)
match, err := expression.EvalBool(e.otherConditions, joinedRow.Data, e.ctx)
if err != nil {
return errors.Trace(err)
}
if match {
outerMatch = true
e.resultRows = append(e.resultRows, joinedRow)
}
}
if e.outer && !outerMatch {
e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow))
}
}
outerCursor, innerCursor = outerEndCursor, innerEndCursor
} else if c < 0 { // outer smaller then inner, move and enlarge outer cursor
outerCursor = getNextCursor(outerCursor, e.outerRows)
outerRow := e.outerRows[outerCursor].row
if e.outer {
e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow))
}
} else {
innerCursor = getNextCursor(innerCursor, e.outerRows)
}
}
for e.outer && outerCursor < len(e.outerRows) {
outerRow := e.outerRows[outerCursor].row
e.resultRows = append(e.resultRows, e.fillDefaultValues(outerRow))
outerCursor++
}
return nil
}

View File

@ -256,7 +256,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) {
// Test Index Join + Order by.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a, t2.a from t t1, t t2 where t1.a = t2.a order by t1.c",
best: "IndexJoin{IndexReader(Index(t.c_d_e)[[<nil>,+inf]])->TableReader(Table(t))}(t1.a,t2.a)->Projection->Projection",
best: "IndexJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->Projection->Sort->Projection",
},
// Test Index Join + Order by.
{

View File

@ -148,7 +148,6 @@ func joinKeysMatchIndex(keys []*expression.Column, index *model.IndexInfo) []int
func (p *LogicalJoin) convertToIndexJoin(prop *requiredProp, outerIdx int) (taskProfile, error) {
outerChild := p.children[outerIdx].(LogicalPlan)
innerChild := p.children[1-outerIdx].(LogicalPlan)
canPassProp := len(outerChild.Schema().ColumnsIndices(prop.cols)) > 0
var (
outerTask taskProfile
useTableScan bool
@ -160,11 +159,7 @@ func (p *LogicalJoin) convertToIndexJoin(prop *requiredProp, outerIdx int) (task
innerJoinKeys = make([]*expression.Column, 0, len(p.EqualConditions))
outerJoinKeys = make([]*expression.Column, 0, len(p.EqualConditions))
)
if canPassProp {
outerTask, err = outerChild.convert2NewPhysicalPlan(prop)
} else {
outerTask, err = outerChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType})
}
outerTask, err = outerChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType})
if err != nil {
return nil, errors.Trace(err)
}
@ -234,39 +229,45 @@ func (p *LogicalJoin) convertToIndexJoin(prop *requiredProp, outerIdx int) (task
LeftConditions: leftConds,
RightConditions: rightConds,
OtherConditions: p.OtherConditions,
outer: p.JoinType != InnerJoin,
outerJoinKeys: outerJoinKeys,
innerJoinKeys: innerJoinKeys,
Outer: p.JoinType != InnerJoin,
OuterJoinKeys: outerJoinKeys,
InnerJoinKeys: innerJoinKeys,
DefaultValues: p.DefaultValues,
}.init(p.allocator, p.ctx, p.children[outerIdx], p.children[1-outerIdx])
task := join.attach2TaskProfile(outerTask, innerTask)
if !canPassProp {
task = prop.enforceProperty(task, p.ctx, p.allocator)
}
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, nil
}
// tryToGetIndexJoin tries to get index join plan. If fails, it returns nil.
// Currently we only check by hint. If we prefer the left index join but the join type is right outer, it will fail to return.
func (p *LogicalJoin) tryToGetIndexJoin(prop *requiredProp) (taskProfile, error) {
func (p *LogicalJoin) tryToGetIndexJoin(prop *requiredProp) (bestTask taskProfile, err error) {
if len(p.EqualConditions) == 0 {
return nil, nil
}
leftOuter := (p.preferINLJ & preferLeftAsOuter) > 0
if leftOuter {
if p.JoinType == RightOuterJoin {
return nil, nil
if p.JoinType != RightOuterJoin {
bestTask, err = p.convertToIndexJoin(prop, 0)
if err != nil {
return nil, errors.Trace(err)
}
}
return p.convertToIndexJoin(prop, 0)
}
rightOuter := (p.preferINLJ & preferRightAsOuter) > 0
if rightOuter {
if p.JoinType == LeftOuterJoin {
return nil, nil
if p.JoinType != LeftOuterJoin {
task, err := p.convertToIndexJoin(prop, 1)
if err != nil {
return nil, errors.Trace(err)
}
if bestTask == nil || bestTask.cost() > task.cost() {
bestTask = task
}
}
return p.convertToIndexJoin(prop, 1)
}
return nil, nil
return
}
// convert2NewPhysicalPlan implements PhysicalPlan interface.

View File

@ -470,9 +470,9 @@ type PhysicalIndexJoin struct {
*basePlan
basePhysicalPlan
outer bool
outerJoinKeys []*expression.Column
innerJoinKeys []*expression.Column
Outer bool
OuterJoinKeys []*expression.Column
InnerJoinKeys []*expression.Column
LeftConditions expression.CNFExprs
RightConditions expression.CNFExprs
OtherConditions expression.CNFExprs

View File

@ -105,6 +105,26 @@ func (p *PhysicalMergeJoin) ResolveIndices() {
}
}
// ResolveIndices implements Plan interface.
func (p *PhysicalIndexJoin) ResolveIndices() {
p.basePlan.ResolveIndices()
lSchema := p.children[0].Schema()
rSchema := p.children[1].Schema()
for i := range p.InnerJoinKeys {
p.OuterJoinKeys[i].ResolveIndices(lSchema)
p.InnerJoinKeys[i].ResolveIndices(rSchema)
}
for _, expr := range p.LeftConditions {
expr.ResolveIndices(lSchema)
}
for _, expr := range p.RightConditions {
expr.ResolveIndices(rSchema)
}
for _, expr := range p.OtherConditions {
expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema))
}
}
// ResolveIndices implements Plan interface.
func (p *PhysicalUnionScan) ResolveIndices() {
for _, expr := range p.Conditions {

View File

@ -172,9 +172,9 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) {
strs = strs[:idx]
idxs = idxs[:last]
str = "IndexJoin{" + strings.Join(children, "->") + "}"
for i := range x.outerJoinKeys {
l := x.outerJoinKeys[i]
r := x.innerJoinKeys[i]
for i := range x.OuterJoinKeys {
l := x.OuterJoinKeys[i]
r := x.InnerJoinKeys[i]
str += fmt.Sprintf("(%s,%s)", l, r)
}
default: