Files
tidb/executor/join.go

840 lines
22 KiB
Go

// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"sync"
"sync/atomic"
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mvmap"
goctx "golang.org/x/net/context"
)
var (
_ joinExec = &NestedLoopJoinExec{}
_ Executor = &HashJoinExec{}
_ joinExec = &HashSemiJoinExec{}
_ Executor = &ApplyJoinExec{}
)
// HashJoinExec implements the hash join algorithm.
type HashJoinExec struct {
baseExecutor
outerExec Executor
innerlExec Executor
outerFilter expression.CNFExprs
innerFilter expression.CNFExprs
outerKeys []*expression.Column
innerKeys []*expression.Column
prepared bool
concurrency int // concurrency is number of concurrent channels and join workers.
hashTable *mvmap.MVMap
hashJoinBuffers []*hashJoinBuffer
outerBufferChs []chan *execResult
workerWaitGroup sync.WaitGroup // workerWaitGroup is for sync multiple join workers.
finished atomic.Value
closeCh chan struct{} // closeCh add a lock for closing executor.
defaultInners []types.Datum
joinType plan.JoinType
resultGenerator joinResultGenerator
resultBufferCh chan *execResult // Channels for output.
resultBuffer []Row
resultCursor int
}
type hashJoinBuffer struct {
data []types.Datum
bytes []byte
}
// Close implements the Executor Close interface.
func (e *HashJoinExec) Close() error {
e.finished.Store(true)
if err := e.baseExecutor.Close(); err != nil {
return errors.Trace(err)
}
if e.prepared {
for range e.resultBufferCh {
}
<-e.closeCh
}
e.resultBuffer = nil
return nil
}
// Open implements the Executor Open interface.
func (e *HashJoinExec) Open(goCtx goctx.Context) error {
if err := e.baseExecutor.Open(goCtx); err != nil {
return errors.Trace(err)
}
e.prepared = false
e.hashJoinBuffers = make([]*hashJoinBuffer, 0, e.concurrency)
for i := 0; i < e.concurrency; i++ {
buffer := &hashJoinBuffer{
data: make([]types.Datum, len(e.outerKeys)),
bytes: make([]byte, 0, 10000),
}
e.hashJoinBuffers = append(e.hashJoinBuffers, buffer)
}
e.closeCh = make(chan struct{})
e.finished.Store(false)
e.workerWaitGroup = sync.WaitGroup{}
e.resultCursor = 0
return nil
}
// makeJoinRow simply creates a new row that appends row b to row a.
func makeJoinRow(a Row, b Row) Row {
ret := make([]types.Datum, 0, len(a)+len(b))
ret = append(ret, a...)
ret = append(ret, b...)
return ret
}
func (e *HashJoinExec) encodeRow(b []byte, row Row) ([]byte, error) {
loc := e.ctx.GetSessionVars().GetTimeZone()
for _, datum := range row {
tmp, err := tablecodec.EncodeValue(datum, loc)
if err != nil {
return nil, errors.Trace(err)
}
b = append(b, tmp...)
}
return b, nil
}
func (e *HashJoinExec) decodeRow(data []byte) (Row, error) {
values := make([]types.Datum, e.innerlExec.Schema().Len())
err := codec.SetRawValues(data, values)
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.innerlExec.Schema(), e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
return values, nil
}
// getJoinKey gets the hash key when given a row and hash columns.
// It will return a boolean value representing if the hash key has null, a byte slice representing the result hash code.
func getJoinKey(cols []*expression.Column, row Row, vals []types.Datum, bytes []byte) (bool, []byte, error) {
var err error
for i, col := range cols {
vals[i], err = col.Eval(row)
if err != nil {
return false, nil, errors.Trace(err)
}
if vals[i].IsNull() {
return true, nil, nil
}
}
if len(vals) == 0 {
return false, nil, nil
}
bytes, err = codec.HashValues(bytes, vals...)
return false, bytes, errors.Trace(err)
}
// fetchOuterRows fetches rows from the big table in a background goroutine
// and sends the rows to multiple channels which will be read by multiple join workers.
func (e *HashJoinExec) fetchOuterRows(goCtx goctx.Context) {
defer func() {
for _, outerBufferCh := range e.outerBufferChs {
close(outerBufferCh)
}
e.workerWaitGroup.Done()
}()
bufferCapacity, maxBufferCapacity := 1, 128
for i, noMoreData := 0, false; !noMoreData; i = (i + 1) % e.concurrency {
outerBuffer := &execResult{rows: make([]Row, 0, bufferCapacity)}
for !noMoreData && len(outerBuffer.rows) < bufferCapacity {
if e.finished.Load().(bool) {
return
}
outerRow, err := e.outerExec.Next(goCtx)
if err != nil || outerRow == nil {
outerBuffer.err = errors.Trace(err)
noMoreData = true
break
}
outerBuffer.rows = append(outerBuffer.rows, outerRow)
}
if noMoreData && len(outerBuffer.rows) == 0 && outerBuffer.err == nil {
break
}
select {
// TODO: Recover the code.
// case <-e.ctx.GoCtx().Done():
// return
case e.outerBufferChs[i] <- outerBuffer:
if !noMoreData && bufferCapacity < maxBufferCapacity {
bufferCapacity <<= 1
}
}
}
}
// prepare runs the first time when 'Next' is called, it starts one worker goroutine to fetch rows from the big table,
// and reads all data from the small table to build a hash table, then starts multiple join worker goroutines.
func (e *HashJoinExec) prepare(goCtx goctx.Context) error {
e.hashTable = mvmap.NewMVMap()
var buffer []byte
for {
innerRow, err := e.innerlExec.Next(goCtx)
if err != nil {
return errors.Trace(err)
}
if innerRow == nil {
break
}
matched, err := expression.EvalBool(e.innerFilter, innerRow, e.ctx)
if err != nil {
return errors.Trace(err)
}
if !matched {
continue
}
hasNull, joinKey, err := getJoinKey(e.innerKeys, innerRow, e.hashJoinBuffers[0].data, nil)
if err != nil {
return errors.Trace(err)
}
if hasNull {
continue
}
buffer = buffer[:0]
buffer, err = e.encodeRow(buffer, innerRow)
if err != nil {
return errors.Trace(err)
}
e.hashTable.Put(joinKey, buffer)
}
e.prepared = true
e.resultBufferCh = make(chan *execResult, e.concurrency)
// If it's inner join and the small table is filtered out, there is no need to fetch big table and
// start join workers to do the join work. Otherwise, we start one goroutine to fetch outer rows
// and e.concurrency goroutines to concatenate the matched inner and outer rows and filter the result.
if !(e.hashTable.Len() == 0 && e.joinType == plan.InnerJoin) {
e.outerBufferChs = make([]chan *execResult, e.concurrency)
for i := 0; i < e.concurrency; i++ {
e.outerBufferChs[i] = make(chan *execResult, e.concurrency)
}
// Start a worker to fetch outer rows and partition them to join workers.
e.workerWaitGroup.Add(1)
go e.fetchOuterRows(goCtx)
// Start e.concurrency join workers to probe hash table and join inner and outer rows.
for i := 0; i < e.concurrency; i++ {
e.workerWaitGroup.Add(1)
go e.runJoinWorker(i)
}
}
// start a goroutine to wait join workers finish their job and close channels.
go e.waitJoinWorkersAndCloseResultChan()
return nil
}
func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() {
e.workerWaitGroup.Wait()
close(e.resultBufferCh)
close(e.closeCh)
}
// filterOuters filters the outer rows stored in "outerBuffer" and move all the matched rows ahead of the unmatched rows.
// The number of matched outer rows is returned as the first value.
func (e *HashJoinExec) filterOuters(outerBuffer *execResult, outerFilterResult []bool) (int, error) {
if e.outerFilter == nil {
return len(outerBuffer.rows), nil
}
outerFilterResult = outerFilterResult[:0]
for _, outerRow := range outerBuffer.rows {
matched, err := expression.EvalBool(e.outerFilter, outerRow, e.ctx)
if err != nil {
return 0, errors.Trace(err)
}
outerFilterResult = append(outerFilterResult, matched)
}
i, j := 0, len(outerBuffer.rows)-1
for i <= j {
for i <= j && outerFilterResult[i] {
i++
}
for i <= j && !outerFilterResult[j] {
j--
}
if i <= j {
outerFilterResult[i], outerFilterResult[j] = outerFilterResult[j], outerFilterResult[i]
outerBuffer.rows[i], outerBuffer.rows[j] = outerBuffer.rows[j], outerBuffer.rows[i]
}
}
return i, nil
}
// runJoinWorker does join job in one goroutine.
func (e *HashJoinExec) runJoinWorker(workerID int) {
bufferCapacity := 1024
resultBuffer := &execResult{rows: make([]Row, 0, bufferCapacity)}
outerFilterResult := make([]bool, 0, bufferCapacity)
var outerBuffer *execResult
for ok := true; ok; {
select {
// TODO: Recover the code.
// case <-e.ctx.GoCtx().Done():
// ok = false
case outerBuffer, ok = <-e.outerBufferChs[workerID]:
}
if !ok || e.finished.Load().(bool) {
break
}
if outerBuffer.err != nil {
resultBuffer.err = errors.Trace(outerBuffer.err)
break
}
numMatchedOuters, err := e.filterOuters(outerBuffer, outerFilterResult)
if err != nil {
outerBuffer.err = errors.Trace(err)
break
}
// process unmatched outer rows.
resultBuffer.rows = e.resultGenerator.emitUnMatchedOuters(outerBuffer.rows[numMatchedOuters:], resultBuffer.rows)
// process matched outer rows.
for _, outerRow := range outerBuffer.rows[:numMatchedOuters] {
if len(resultBuffer.rows) >= bufferCapacity {
e.resultBufferCh <- resultBuffer
resultBuffer = &execResult{rows: make([]Row, 0, bufferCapacity)}
}
ok = e.joinOuterRow(workerID, outerRow, resultBuffer)
if !ok {
break
}
}
}
if len(resultBuffer.rows) > 0 || resultBuffer.err != nil {
e.resultBufferCh <- resultBuffer
}
e.workerWaitGroup.Done()
}
// joinOuterRow creates result rows from a row in a big table and sends them to resultRows channel.
// Every matching row generates a result row.
// If there are no matching rows and it is outer join, a null filled result row is created.
func (e *HashJoinExec) joinOuterRow(workerID int, outerRow Row, resultBuffer *execResult) bool {
buffer := e.hashJoinBuffers[workerID]
hasNull, joinKey, err := getJoinKey(e.outerKeys, outerRow, buffer.data, buffer.bytes[:0:cap(buffer.bytes)])
if err != nil {
resultBuffer.err = errors.Trace(err)
return false
}
if hasNull {
resultBuffer.rows = e.resultGenerator.emitUnMatchedOuter(outerRow, resultBuffer.rows)
return true
}
values := e.hashTable.Get(joinKey)
if len(values) == 0 {
resultBuffer.rows = e.resultGenerator.emitUnMatchedOuter(outerRow, resultBuffer.rows)
return true
}
innerRows := make([]Row, 0, len(values))
for _, value := range values {
innerRow, err1 := e.decodeRow(value)
if err1 != nil {
resultBuffer.rows = nil
resultBuffer.err = errors.Trace(err1)
return false
}
innerRows = append(innerRows, innerRow)
}
matched := false
resultBuffer.rows, matched, err = e.resultGenerator.emitMatchedInners(outerRow, innerRows, resultBuffer.rows)
if err != nil {
resultBuffer.err = errors.Trace(err)
return false
}
if !matched {
resultBuffer.rows = e.resultGenerator.emitUnMatchedOuter(outerRow, resultBuffer.rows)
}
return true
}
// Next implements the Executor Next interface.
func (e *HashJoinExec) Next(goCtx goctx.Context) (Row, error) {
if !e.prepared {
if err := e.prepare(goCtx); err != nil {
return nil, errors.Trace(err)
}
}
if e.resultCursor >= len(e.resultBuffer) {
e.resultCursor = 0
select {
case resultBuffer, ok := <-e.resultBufferCh:
if !ok {
return nil, nil
}
if resultBuffer.err != nil {
e.finished.Store(true)
return nil, errors.Trace(resultBuffer.err)
}
e.resultBuffer = resultBuffer.rows
// TODO: Recover the code.
// case <-e.ctx.GoCtx().Done():
// return nil, nil
}
}
// len(e.resultBuffer) > 0 is guaranteed in the above "select".
result := e.resultBuffer[e.resultCursor]
e.resultCursor++
return result, nil
}
// joinExec is the common interface of join algorithm except for hash join.
type joinExec interface {
Executor
// fetchBigRow fetches a valid row from big Exec and returns a bool value that means if it is matched.
fetchBigRow(goCtx goctx.Context) (Row, bool, error)
// prepare reads all records from small Exec and stores them.
prepare(goCtx goctx.Context) error
// doJoin fetches a row from big exec and a bool value that means if it's matched with big filter,
// then get all the rows matches the on condition.
doJoin(Row, bool) ([]Row, error)
}
// NestedLoopJoinExec implements nested-loop algorithm for join.
type NestedLoopJoinExec struct {
baseExecutor
innerRows []Row
cursor int
resultRows []Row
SmallExec Executor
BigExec Executor
leftSmall bool
prepared bool
SmallFilter expression.CNFExprs
BigFilter expression.CNFExprs
OtherFilter expression.CNFExprs
outer bool
defaultValues []types.Datum
}
// Close implements Executor interface.
func (e *NestedLoopJoinExec) Close() error {
e.resultRows = nil
e.innerRows = nil
return e.BigExec.Close()
}
// Open implements Executor Open interface.
func (e *NestedLoopJoinExec) Open(goCtx goctx.Context) error {
e.cursor = 0
e.prepared = false
e.resultRows = e.resultRows[:0]
e.innerRows = e.innerRows[:0]
return errors.Trace(e.BigExec.Open(goCtx))
}
func (e *NestedLoopJoinExec) fetchBigRow(goCtx goctx.Context) (Row, bool, error) {
for {
bigRow, err := e.BigExec.Next(goCtx)
if err != nil {
return nil, false, errors.Trace(err)
}
if bigRow == nil {
return nil, false, e.BigExec.Close()
}
matched, err := expression.EvalBool(e.BigFilter, bigRow, e.ctx)
if err != nil {
return nil, false, errors.Trace(err)
}
if matched {
return bigRow, true, nil
} else if e.outer {
return bigRow, false, nil
}
}
}
// prepare runs the first time when 'Next' is called and it reads all data from the small table and stores
// them in a slice.
func (e *NestedLoopJoinExec) prepare(goCtx goctx.Context) error {
err := e.SmallExec.Open(goctx.TODO())
if err != nil {
return errors.Trace(err)
}
defer terror.Call(e.SmallExec.Close)
e.innerRows = e.innerRows[:0]
e.prepared = true
for {
row, err := e.SmallExec.Next(goCtx)
if err != nil {
return errors.Trace(err)
}
if row == nil {
return nil
}
matched, err := expression.EvalBool(e.SmallFilter, row, e.ctx)
if err != nil {
return errors.Trace(err)
}
if matched {
e.innerRows = append(e.innerRows, row)
}
}
}
func (e *NestedLoopJoinExec) fillRowWithDefaultValue(bigRow Row) (returnRow Row) {
smallRow := make([]types.Datum, e.SmallExec.Schema().Len())
copy(smallRow, e.defaultValues)
if e.leftSmall {
returnRow = makeJoinRow(smallRow, bigRow)
} else {
returnRow = makeJoinRow(bigRow, smallRow)
}
return returnRow
}
func (e *NestedLoopJoinExec) doJoin(bigRow Row, match bool) ([]Row, error) {
e.resultRows = e.resultRows[0:0]
if !match && e.outer {
row := e.fillRowWithDefaultValue(bigRow)
e.resultRows = append(e.resultRows, row)
return e.resultRows, nil
}
for _, row := range e.innerRows {
var mergedRow Row
if e.leftSmall {
mergedRow = makeJoinRow(row, bigRow)
} else {
mergedRow = makeJoinRow(bigRow, row)
}
matched, err := expression.EvalBool(e.OtherFilter, mergedRow, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
if !matched {
continue
}
e.resultRows = append(e.resultRows, mergedRow)
}
if len(e.resultRows) == 0 && e.outer {
e.resultRows = append(e.resultRows, e.fillRowWithDefaultValue(bigRow))
}
return e.resultRows, nil
}
// Next implements the Executor interface.
func (e *NestedLoopJoinExec) Next(goCtx goctx.Context) (Row, error) {
if !e.prepared {
if err := e.prepare(goCtx); err != nil {
return nil, errors.Trace(err)
}
}
for {
if e.cursor < len(e.resultRows) {
retRow := e.resultRows[e.cursor]
e.cursor++
return retRow, nil
}
bigRow, match, err := e.fetchBigRow(goCtx)
if bigRow == nil || err != nil {
return bigRow, errors.Trace(err)
}
e.resultRows, err = e.doJoin(bigRow, match)
if err != nil {
return nil, errors.Trace(err)
}
e.cursor = 0
}
}
// HashSemiJoinExec implements the hash join algorithm for semi join.
type HashSemiJoinExec struct {
baseExecutor
hashTable map[string][]Row
smallHashKey []*expression.Column
bigHashKey []*expression.Column
smallExec Executor
bigExec Executor
prepared bool
smallFilter expression.CNFExprs
bigFilter expression.CNFExprs
otherFilter expression.CNFExprs
resultRows []Row
// auxMode is a mode that the result row always returns with an extra column which stores a boolean
// or NULL value to indicate if this row is matched.
auxMode bool
smallTableHasNull bool
// anti is true, semi join only output the unmatched row.
anti bool
}
// Close implements the Executor Close interface.
func (e *HashSemiJoinExec) Close() error {
e.hashTable = nil
e.resultRows = nil
return e.bigExec.Close()
}
// Open implements the Executor Open interface.
func (e *HashSemiJoinExec) Open(goCtx goctx.Context) error {
e.prepared = false
e.smallTableHasNull = false
e.hashTable = make(map[string][]Row)
e.resultRows = make([]Row, 1)
return errors.Trace(e.bigExec.Open(goCtx))
}
// prepare runs the first time when 'Next' is called and it reads all data from the small table and stores
// them in a hash table.
func (e *HashSemiJoinExec) prepare(goCtx goctx.Context) error {
err := e.smallExec.Open(goctx.TODO())
if err != nil {
return errors.Trace(err)
}
defer terror.Call(e.smallExec.Close)
e.hashTable = make(map[string][]Row)
e.resultRows = make([]Row, 1)
e.prepared = true
for {
row, err := e.smallExec.Next(goCtx)
if err != nil {
return errors.Trace(err)
}
if row == nil {
return nil
}
matched, err := expression.EvalBool(e.smallFilter, row, e.ctx)
if err != nil {
return errors.Trace(err)
}
if !matched {
continue
}
hasNull, hashcode, err := getJoinKey(e.smallHashKey, row, make([]types.Datum, len(e.smallHashKey)), nil)
if err != nil {
return errors.Trace(err)
}
if hasNull {
e.smallTableHasNull = true
continue
}
if rows, ok := e.hashTable[string(hashcode)]; !ok {
e.hashTable[string(hashcode)] = []Row{row}
} else {
e.hashTable[string(hashcode)] = append(rows, row)
}
}
}
func (e *HashSemiJoinExec) rowIsMatched(bigRow Row) (matched bool, hasNull bool, err error) {
hasNull, hashcode, err := getJoinKey(e.bigHashKey, bigRow, make([]types.Datum, len(e.smallHashKey)), nil)
if err != nil {
return false, false, errors.Trace(err)
}
if hasNull {
return false, true, nil
}
rows, ok := e.hashTable[string(hashcode)]
if !ok {
return
}
// match eq condition
for _, smallRow := range rows {
matchedRow := makeJoinRow(bigRow, smallRow)
matched, err = expression.EvalBool(e.otherFilter, matchedRow, e.ctx)
if err != nil {
return false, false, errors.Trace(err)
}
if matched {
return
}
}
return
}
func (e *HashSemiJoinExec) fetchBigRow(goCtx goctx.Context) (Row, bool, error) {
for {
bigRow, err := e.bigExec.Next(goCtx)
if err != nil {
return nil, false, errors.Trace(err)
}
if bigRow == nil {
return nil, false, errors.Trace(e.bigExec.Close())
}
matched, err := expression.EvalBool(e.bigFilter, bigRow, e.ctx)
if err != nil {
return nil, false, errors.Trace(err)
}
if matched {
return bigRow, true, nil
} else if e.auxMode {
return bigRow, false, nil
}
}
}
func (e *HashSemiJoinExec) doJoin(bigRow Row, match bool) ([]Row, error) {
if e.auxMode && !match {
bigRow = append(bigRow, types.NewDatum(false))
e.resultRows[0] = bigRow
return e.resultRows, nil
}
matched, isNull, err := e.rowIsMatched(bigRow)
if err != nil {
return nil, errors.Trace(err)
}
if !matched && e.smallTableHasNull {
isNull = true
}
if e.anti && !isNull {
matched = !matched
}
// For the auxMode subquery, we return the row with a Datum indicating if it's a match,
// For the non-auxMode subquery, we return the matching row only.
if e.auxMode {
if isNull {
bigRow = append(bigRow, types.NewDatum(nil))
} else {
bigRow = append(bigRow, types.NewDatum(matched))
}
matched = true
}
if matched {
e.resultRows[0] = bigRow
return e.resultRows, nil
}
return nil, nil
}
// Next implements the Executor Next interface.
func (e *HashSemiJoinExec) Next(goCtx goctx.Context) (Row, error) {
if !e.prepared {
if err := e.prepare(goCtx); err != nil {
return nil, errors.Trace(err)
}
}
for {
bigRow, match, err := e.fetchBigRow(goCtx)
if bigRow == nil || err != nil {
return bigRow, errors.Trace(err)
}
resultRows, err := e.doJoin(bigRow, match)
if err != nil {
return nil, errors.Trace(err)
}
if len(resultRows) > 0 {
return resultRows[0], nil
}
}
}
// ApplyJoinExec is the new logic of apply.
type ApplyJoinExec struct {
baseExecutor
join joinExec
outerSchema []*expression.CorrelatedColumn
cursor int
resultRows []Row
}
// Close implements the Executor interface.
func (e *ApplyJoinExec) Close() error {
return e.join.Close()
}
// Open implements the Executor interface.
func (e *ApplyJoinExec) Open(goCtx goctx.Context) error {
e.cursor = 0
e.resultRows = nil
return errors.Trace(e.join.Open(goCtx))
}
// Next implements the Executor interface.
func (e *ApplyJoinExec) Next(goCtx goctx.Context) (Row, error) {
for {
if e.cursor < len(e.resultRows) {
row := e.resultRows[e.cursor]
e.cursor++
return row, nil
}
bigRow, match, err := e.join.fetchBigRow(goCtx)
if bigRow == nil || err != nil {
return nil, errors.Trace(err)
}
for _, col := range e.outerSchema {
*col.Data = bigRow[col.Index]
}
err = e.join.prepare(goCtx)
if err != nil {
return nil, errors.Trace(err)
}
e.resultRows, err = e.join.doJoin(bigRow, match)
if err != nil {
return nil, errors.Trace(err)
}
e.cursor = 0
}
}