Files
tidb/pkg/executor/join/index_lookup_hash_join.go
2025-12-31 10:12:31 +00:00

974 lines
29 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package join
import (
"context"
"fmt"
"hash"
"hash/fnv"
"runtime"
"runtime/trace"
"sync"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/channel"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
)
// numResChkHold indicates the number of resource chunks that an inner worker
// holds at the same time.
// It's used in 2 cases individually:
// 1. IndexMergeJoin
// 2. IndexNestedLoopHashJoin:
// It's used when IndexNestedLoopHashJoin.KeepOuterOrder is true.
// Otherwise, there will be at most `concurrency` resource chunks throughout
// the execution of IndexNestedLoopHashJoin.
const numResChkHold = 4
const maxRowsPerFetch = 4096
// IndexNestedLoopHashJoin employs one outer worker and N inner workers to
// execute concurrently. The output order is not promised.
//
// The execution flow is very similar to IndexLookUpReader:
// 1. The outer worker reads N outer rows, builds a task and sends it to the
// inner worker channel.
// 2. The inner worker receives the tasks and does 3 things for every task:
// 1. builds hash table from the outer rows
// 2. builds key ranges from outer rows and fetches inner rows
// 3. probes the hash table and sends the join result to the main thread channel.
// Note: step 1 and step 2 runs concurrently.
//
// 3. The main thread receives the join results.
type IndexNestedLoopHashJoin struct {
IndexLookUpJoin
resultCh chan *indexHashJoinResult
joinChkResourceCh []chan *chunk.Chunk
// We build individual joiner for each inner worker when using chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
Joiners []Joiner
KeepOuterOrder bool
curTask *indexHashJoinTask
// taskCh is only used when `KeepOuterOrder` is true.
taskCh chan *indexHashJoinTask
stats *indexLookUpJoinRuntimeStats
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr struct {
sync.Mutex
atomic.Bool
error
}
ctxWithCancel context.Context
}
type indexHashJoinOuterWorker struct {
outerWorker
innerCh chan *indexHashJoinTask
keepOuterOrder bool
// taskCh is only used when the outer order needs to be promised.
taskCh chan *indexHashJoinTask
}
type indexHashJoinInnerWorker struct {
innerWorker
joiner Joiner
joinChkResourceCh chan *chunk.Chunk
// resultCh is valid only when indexNestedLoopHashJoin do not need to keep
// order. Otherwise, it will be nil.
resultCh chan *indexHashJoinResult
taskCh <-chan *indexHashJoinTask
wg *sync.WaitGroup
joinKeyBuf []byte
outerRowStatus []outerRowStatusFlag
rowIter *chunk.Iterator4Slice
}
type indexHashJoinResult struct {
chk *chunk.Chunk
err error
src chan<- *chunk.Chunk
}
type indexHashJoinTask struct {
*lookUpJoinTask
outerRowStatus [][]outerRowStatusFlag
lookupMap BaseHashTable
err error
keepOuterOrder bool
// resultCh is only used when the outer order needs to be promised.
resultCh chan *indexHashJoinResult
// matchedInnerRowPtrs is only valid when the outer order needs to be
// promised. Otherwise, it will be nil.
// len(matchedInnerRowPtrs) equals to
// lookUpJoinTask.outerResult.NumChunks(), and the elements of every
// matchedInnerRowPtrs[chkIdx][rowIdx] indicates the matched inner row ptrs
// of the corresponding outer row.
matchedInnerRowPtrs [][][]chunk.RowPtr
}
// Open implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
err := exec.Open(ctx, e.Children(0))
if err != nil {
return err
}
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
e.cancelFunc = nil
e.innerPtrBytes = make([][]byte, 0, 8)
if e.RuntimeStats() != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
}
e.Finished.Store(false)
return nil
}
func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context, initBatchSize int) {
concurrency := e.Ctx().GetSessionVars().IndexLookupJoinConcurrency()
if e.stats != nil {
e.stats.concurrency = concurrency
}
workerCtx, cancelFunc := context.WithCancel(ctx)
e.ctxWithCancel, e.cancelFunc = workerCtx, cancelFunc
innerCh := make(chan *indexHashJoinTask, concurrency)
if e.KeepOuterOrder {
e.taskCh = make(chan *indexHashJoinTask, concurrency)
// When `KeepOuterOrder` is true, each task holds their own `resultCh`
// individually, thus we do not need a global resultCh.
e.resultCh = nil
} else {
e.resultCh = make(chan *indexHashJoinResult, concurrency)
}
e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency)
e.WorkerWg.Add(1)
ow := e.newOuterWorker(innerCh, initBatchSize)
go util.WithRecovery(func() { ow.run(e.ctxWithCancel) }, e.finishJoinWorkers)
for i := range concurrency {
if !e.KeepOuterOrder {
e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1)
e.joinChkResourceCh[i] <- exec.NewFirstChunk(e)
} else {
e.joinChkResourceCh[i] = make(chan *chunk.Chunk, numResChkHold)
for range numResChkHold {
e.joinChkResourceCh[i] <- exec.NewFirstChunk(e)
}
}
}
e.WorkerWg.Add(concurrency)
for i := range concurrency {
workerID := i
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(e.ctxWithCancel, cancelFunc) }, e.finishJoinWorkers)
}
go e.wait4JoinWorkers()
}
func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r any) {
if r != nil {
// record the panic error first to avoid returning context canceled
err := fmt.Errorf("%v", r)
if recoverdErr, ok := r.(error); ok {
err = recoverdErr
}
if !e.panicErr.Load() {
e.panicErr.Lock()
if !e.panicErr.Load() {
e.panicErr.error = err
e.panicErr.Store(true)
}
e.panicErr.Unlock()
}
// then cancel workers
e.IndexLookUpJoin.Finished.Store(true)
if e.cancelFunc != nil {
e.cancelFunc()
}
if !e.KeepOuterOrder {
e.resultCh <- &indexHashJoinResult{err: err}
} else {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
}
e.WorkerWg.Done()
}
func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() {
e.WorkerWg.Wait()
if e.resultCh != nil {
close(e.resultCh)
}
if e.taskCh != nil {
close(e.taskCh)
}
}
// Next implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx, req.RequiredRows())
e.prepared = true
}
req.Reset()
if e.KeepOuterOrder {
return e.runInOrder(e.ctxWithCancel, req)
}
return e.runUnordered(e.ctxWithCancel, req)
}
func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
for {
if e.isDryUpTasks(ctx) {
if e.panicErr.Load() {
return e.panicErr.error
}
return nil
}
if e.curTask.err != nil {
return e.curTask.err
}
result, err := e.getResultFromChannel(ctx, e.curTask.resultCh)
if err != nil {
return err
}
if result == nil {
e.curTask = nil
continue
}
return e.handleResult(req, result)
}
}
func (e *IndexNestedLoopHashJoin) runUnordered(ctx context.Context, req *chunk.Chunk) error {
result, err := e.getResultFromChannel(ctx, e.resultCh)
if err != nil {
return err
}
return e.handleResult(req, result)
}
// isDryUpTasks indicates whether all the tasks have been processed.
func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
if e.curTask != nil {
return false
}
var ok bool
select {
case e.curTask, ok = <-e.taskCh:
if !ok {
return true
}
case <-ctx.Done():
return true
}
return false
}
func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) {
var (
result *indexHashJoinResult
ok bool
)
select {
case result, ok = <-resultCh:
if !ok {
return nil, nil
}
if result.err != nil {
return nil, result.err
}
case <-ctx.Done():
failpoint.Inject("TestIssue49692", func() {
for !e.panicErr.Load() {
runtime.Gosched()
}
})
err := error(nil)
if e.panicErr.Load() {
err = e.panicErr.error
}
if err == nil {
err = ctx.Err()
}
return nil, err
}
return result, nil
}
func (*IndexNestedLoopHashJoin) handleResult(req *chunk.Chunk, result *indexHashJoinResult) error {
if result == nil {
return nil
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
}
// Close implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Close() error {
if e.stats != nil {
defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats)
}
if e.cancelFunc != nil {
e.cancelFunc()
}
if e.resultCh != nil {
channel.Clear(e.resultCh)
e.resultCh = nil
}
if e.taskCh != nil {
channel.Clear(e.taskCh)
e.taskCh = nil
}
for i := range e.joinChkResourceCh {
close(e.joinChkResourceCh[i])
}
e.joinChkResourceCh = nil
e.Finished.Store(false)
e.prepared = false
e.ctxWithCancel = nil
return e.BaseExecutor.Close()
}
func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
defer trace.StartRegion(ctx, "IndexHashJoinOuterWorker").End()
defer close(ow.innerCh)
for {
failpoint.Inject("TestIssue30211", nil)
task, err := ow.buildTask(ctx)
failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() {
err = errors.New("mockIndexHashJoinOuterWorkerErr")
})
failpoint.Inject("testIssue54055_1", func(val failpoint.Value) {
if val.(bool) {
err = errors.New("testIssue54055_1")
}
})
if err != nil {
task = &indexHashJoinTask{err: err}
if ow.keepOuterOrder {
// The outerBuilder and innerFetcher run concurrently, we may
// get 2 errors at simultaneously. Thus the capacity of task.resultCh
// needs to be initialized to 2 to avoid waiting.
task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 2)
ow.pushToChan(ctx, task, ow.taskCh)
}
ow.pushToChan(ctx, task, ow.innerCh)
return
}
if task == nil {
return
}
if finished := ow.pushToChan(ctx, task, ow.innerCh); finished {
return
}
if ow.keepOuterOrder {
failpoint.Inject("testIssue20779", func() {
panic("testIssue20779")
})
if finished := ow.pushToChan(ctx, task, ow.taskCh); finished {
return
}
}
}
}
func (ow *indexHashJoinOuterWorker) buildTask(ctx context.Context) (*indexHashJoinTask, error) {
task, err := ow.outerWorker.buildTask(ctx)
if task == nil || err != nil {
return nil, err
}
var (
resultCh chan *indexHashJoinResult
matchedInnerRowPtrs [][][]chunk.RowPtr
)
if ow.keepOuterOrder {
resultCh = make(chan *indexHashJoinResult, numResChkHold)
matchedInnerRowPtrs = make([][][]chunk.RowPtr, task.outerResult.NumChunks())
for i := range matchedInnerRowPtrs {
matchedInnerRowPtrs[i] = make([][]chunk.RowPtr, task.outerResult.GetChunk(i).NumRows())
}
}
numChks := task.outerResult.NumChunks()
outerRowStatus := make([][]outerRowStatusFlag, numChks)
for i := range numChks {
outerRowStatus[i] = make([]outerRowStatusFlag, task.outerResult.GetChunk(i).NumRows())
}
return &indexHashJoinTask{
lookUpJoinTask: task,
outerRowStatus: outerRowStatus,
keepOuterOrder: ow.keepOuterOrder,
resultCh: resultCh,
matchedInnerRowPtrs: matchedInnerRowPtrs,
}, nil
}
func (*indexHashJoinOuterWorker) pushToChan(ctx context.Context, task *indexHashJoinTask, dst chan<- *indexHashJoinTask) bool {
select {
case <-ctx.Done():
return true
case dst <- task:
}
return false
}
func (e *IndexNestedLoopHashJoin) newOuterWorker(innerCh chan *indexHashJoinTask, initBatchSize int) *indexHashJoinOuterWorker {
maxBatchSize := e.Ctx().GetSessionVars().IndexJoinBatchSize
batchSize := min(initBatchSize, maxBatchSize)
ow := &indexHashJoinOuterWorker{
outerWorker: outerWorker{
OuterCtx: e.OuterCtx,
ctx: e.Ctx(),
executor: e.Children(0),
batchSize: batchSize,
maxBatchSize: maxBatchSize,
parentMemTracker: e.memTracker,
lookup: &e.IndexLookUpJoin,
},
innerCh: innerCh,
keepOuterOrder: e.KeepOuterOrder,
taskCh: e.taskCh,
}
return ow
}
func (e *IndexNestedLoopHashJoin) supportIncrementalLookUp() bool {
return !e.KeepOuterOrder &&
(JoinerType(e.Joiners[0]) == base.InnerJoin ||
JoinerType(e.Joiners[0]) == base.LeftOuterJoin ||
JoinerType(e.Joiners[0]) == base.RightOuterJoin ||
JoinerType(e.Joiners[0]) == base.AntiSemiJoin)
}
func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, workerID int) *indexHashJoinInnerWorker {
// Since multiple inner workers run concurrently, we should copy join's IndexRanges for every worker to avoid data race.
copiedRanges := make([]*ranger.Range, 0, len(e.IndexRanges.Range()))
for _, ran := range e.IndexRanges.Range() {
copiedRanges = append(copiedRanges, ran.Clone())
}
var innerStats *innerWorkerRuntimeStats
if e.stats != nil {
innerStats = &e.stats.innerWorker
}
var maxRows int
// If the joiner supports incremental look up, we can fetch inner results in batches,
// retrieving partial results multiple times until all are fetched.
// Otherwise, we fetch all inner results in one batch.
if e.supportIncrementalLookUp() {
maxRows = maxRowsPerFetch
}
iw := &indexHashJoinInnerWorker{
innerWorker: innerWorker{
InnerCtx: e.InnerCtx,
outerCtx: e.OuterCtx,
ctx: e.Ctx(),
maxFetchSize: maxRows,
indexRanges: copiedRanges,
keyOff2IdxOff: e.KeyOff2IdxOff,
stats: innerStats,
lookup: &e.IndexLookUpJoin,
memTracker: memory.NewTracker(memory.LabelForIndexJoinInnerWorker, -1),
},
taskCh: taskCh,
joiner: e.Joiners[workerID],
joinChkResourceCh: e.joinChkResourceCh[workerID],
resultCh: e.resultCh,
joinKeyBuf: make([]byte, 1),
outerRowStatus: make([]outerRowStatusFlag, 0, e.MaxChunkSize()),
rowIter: chunk.NewIterator4Slice([]chunk.Row{}),
}
iw.memTracker.AttachTo(e.memTracker)
if len(copiedRanges) != 0 {
// We should not consume this memory usage in `iw.memTracker`. The
// memory usage of inner worker will be reset the end of iw.handleTask.
// While the life cycle of this memory consumption exists throughout the
// whole active period of inner worker.
e.Ctx().GetSessionVars().StmtCtx.MemTracker.Consume(2 * types.EstimatedMemUsage(copiedRanges[0].LowVal, len(copiedRanges)))
}
if e.LastColHelper != nil {
// nextCwf.TmpConstant needs to be reset for every individual
// inner worker to avoid data race when the inner workers is running
// concurrently.
nextCwf := *e.LastColHelper
nextCwf.TmpConstant = make([]*expression.Constant, len(e.LastColHelper.TmpConstant))
for i := range e.LastColHelper.TmpConstant {
nextCwf.TmpConstant[i] = &expression.Constant{RetType: nextCwf.TargetCol.RetType}
}
iw.nextColCompareFilters = &nextCwf
}
return iw
}
func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.CancelFunc) {
defer trace.StartRegion(ctx, "IndexHashJoinInnerWorker").End()
var task *indexHashJoinTask
joinResult, ok := iw.getNewJoinResult(ctx)
if !ok {
cancelFunc()
return
}
h, resultCh := fnv.New64(), iw.resultCh
for {
// The previous task has been processed, so release the occupied memory
if task != nil {
task.memTracker.Detach()
}
select {
case <-ctx.Done():
return
case task, ok = <-iw.taskCh:
}
if !ok {
break
}
// We need to init resultCh before the err is returned.
if task.keepOuterOrder {
resultCh = task.resultCh
}
if task.err != nil {
joinResult.err = task.err
break
}
err := iw.handleTask(ctx, task, joinResult, h, resultCh)
if err != nil && !task.keepOuterOrder {
// Only need check non-keep-outer-order case because the
// `joinResult` had been sent to the `resultCh` when err != nil.
joinResult.err = err
break
}
if task.keepOuterOrder {
// We need to get a new result holder here because the old
// `joinResult` hash been sent to the `resultCh` or to the
// `joinChkResourceCh`.
joinResult, ok = iw.getNewJoinResult(ctx)
if !ok {
cancelFunc()
return
}
}
}
failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() {
joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr")
})
// When task.KeepOuterOrder is TRUE (resultCh != iw.resultCh):
// - the last joinResult will be handled when the task has been processed,
// thus we DO NOT need to check it here again.
// - we DO NOT check the error here neither, because:
// - if the error is from task.err, the main thread will check the error of each task
// - if the error is from handleTask, the error will be handled in handleTask
// We should not check `task != nil && !task.KeepOuterOrder` here since it's
// possible that `join.chk.NumRows > 0` is true even if task == nil.
if resultCh == iw.resultCh {
if joinResult.err != nil {
resultCh <- joinResult
return
}
if joinResult.chk != nil && joinResult.chk.NumRows() > 0 {
select {
case resultCh <- joinResult:
case <-ctx.Done():
return
}
}
}
}
func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*indexHashJoinResult, bool) {
joinResult := &indexHashJoinResult{
src: iw.joinChkResourceCh,
}
ok := true
select {
case joinResult.chk, ok = <-iw.joinChkResourceCh:
case <-ctx.Done():
joinResult.err = ctx.Err()
return joinResult, false
}
return joinResult, ok
}
func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(task *indexHashJoinTask, h hash.Hash64) {
failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil)
failpoint.Inject("ConsumeRandomPanic", nil)
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.build, int64(time.Since(start)))
}()
}
buf, numChks := make([]byte, 1), task.outerResult.NumChunks()
task.lookupMap = newUnsafeHashTable(task.outerResult.Len())
for chkIdx := range numChks {
chk := task.outerResult.GetChunk(chkIdx)
numRows := chk.NumRows()
if iw.lookup.Finished.Load().(bool) {
return
}
OUTER:
for rowIdx := range numRows {
if task.outerMatch != nil && !task.outerMatch[chkIdx][rowIdx] {
continue
}
row := chk.GetRow(rowIdx)
hashColIdx := iw.outerCtx.HashCols
for _, i := range hashColIdx {
if row.IsNull(i) {
continue OUTER
}
}
h.Reset()
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx.TypeCtx(), h, row, iw.outerCtx.HashTypes, hashColIdx, buf)
failpoint.Inject("testIndexHashJoinBuildErr", func() {
err = errors.New("mockIndexHashJoinBuildErr")
})
if err != nil {
// This panic will be recovered by the invoker.
panic(err.Error())
}
rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
task.lookupMap.Put(h.Sum64(), rowPtr)
}
}
}
func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(resultCh chan *indexHashJoinResult, err error) {
defer func() {
iw.wg.Done()
iw.lookup.WorkerWg.Done()
}()
if err != nil {
resultCh <- &indexHashJoinResult{err: err}
}
}
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) {
defer func() {
iw.memTracker.Consume(-iw.memTracker.BytesConsumed())
if task.keepOuterOrder {
if err != nil {
joinResult.err = err
select {
case <-ctx.Done():
case resultCh <- joinResult:
}
}
close(resultCh)
}
}()
failpoint.Inject("testIssue54055_2", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(10 * time.Millisecond)
panic("testIssue54055_2")
}
})
var joinStartTime time.Time
if iw.stats != nil {
start := time.Now()
defer func() {
endTime := time.Now()
atomic.AddInt64(&iw.stats.totalTime, int64(endTime.Sub(start)))
// Only used for doJoinInOrder. doJoinUnordered will calculate join time in itself.
// FetchInnerResults maybe return err and return, so joinStartTime is not initialized.
if !joinStartTime.IsZero() {
atomic.AddInt64(&iw.stats.join, int64(endTime.Sub(joinStartTime)))
}
}()
}
iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
iw.lookup.WorkerWg.Add(1)
var buildHashTableErr error
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
go util.WithRecovery(
func() {
iw.buildHashTableForOuterResult(task, h)
},
func(r any) {
if r != nil {
buildHashTableErr = errors.Errorf("%v", r)
}
iw.handleHashJoinInnerWorkerPanic(resultCh, buildHashTableErr)
},
)
lookUpContents, err := iw.constructLookupContent(task.lookUpJoinTask)
if err == nil {
err = iw.innerWorker.fetchInnerResults(ctx, task.lookUpJoinTask, lookUpContents)
}
iw.wg.Wait()
// check error after wg.Wait to make sure error message can be sent to
// resultCh even if panic happen in buildHashTableForOuterResult.
failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() {
err = errors.New("IndexHashJoinFetchInnerResultsErr")
})
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
return err
}
if buildHashTableErr != nil {
return buildHashTableErr
}
if !task.keepOuterOrder {
for {
err = iw.doJoinUnordered(ctx, task, joinResult, h, resultCh)
if err != nil {
if task.innerExec != nil {
terror.Log(exec.Close(task.innerExec))
task.innerExec = nil
}
return err
}
// innerExec not finished, we need to fetch inner results and do join again.
if task.innerExec != nil {
err = iw.innerWorker.fetchInnerResults(ctx, task.lookUpJoinTask, lookUpContents)
if err != nil {
return err
}
continue
}
break
}
return nil
}
joinStartTime = time.Now()
return iw.doJoinInOrder(ctx, task, joinResult, h, resultCh)
}
func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.join, int64(time.Since(start)))
}()
}
var ok bool
iter := chunk.NewIterator4List(task.innerResult)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
ok, joinResult = iw.joinMatchedInnerRow2Chunk(ctx, row, task, joinResult, h, iw.joinKeyBuf)
if !ok {
return joinResult.err
}
}
if task.innerExec != nil {
return nil
}
// Only when innerExec is finished, we can call OnMissMatch for the unmatched outer rows.
for chkIdx, outerRowStatus := range task.outerRowStatus {
chk := task.outerResult.GetChunk(chkIdx)
for rowIdx, val := range outerRowStatus {
if val == outerRowMatched {
continue
}
iw.joiner.OnMissMatch(val == outerRowHasNull, chk.GetRow(rowIdx), joinResult.chk)
if joinResult.chk.IsFull() {
select {
case resultCh <- joinResult:
case <-ctx.Done():
return ctx.Err()
}
joinResult, ok = iw.getNewJoinResult(ctx)
if !ok {
return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed")
}
}
}
}
return nil
}
func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task *indexHashJoinTask, h hash.Hash64, buf []byte) (matchedRows []chunk.Row, matchedRowPtr []chunk.RowPtr, err error) {
h.Reset()
err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx.TypeCtx(), h, innerRow, iw.HashTypes, iw.HashCols, buf)
if err != nil {
return nil, nil, err
}
matchedOuterEntry := task.lookupMap.Get(h.Sum64())
if matchedOuterEntry == nil {
return nil, nil, nil
}
joinType := JoinerType(iw.joiner)
isSemiJoin := joinType.IsSemiJoin()
for ; matchedOuterEntry != nil; matchedOuterEntry = matchedOuterEntry.Next {
ptr := matchedOuterEntry.Ptr
outerRow := task.outerResult.GetRow(ptr)
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx.TypeCtx(), innerRow, iw.HashTypes, iw.HashCols, outerRow, iw.outerCtx.HashTypes, iw.outerCtx.HashCols)
if err != nil {
return nil, nil, err
}
if !ok || (task.outerRowStatus[ptr.ChkIdx][ptr.RowIdx] == outerRowMatched && isSemiJoin) {
continue
}
matchedRows = append(matchedRows, outerRow)
matchedRowPtr = append(matchedRowPtr, chunk.RowPtr{ChkIdx: ptr.ChkIdx, RowIdx: ptr.RowIdx})
}
return matchedRows, matchedRowPtr, nil
}
func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Context, innerRow chunk.Row, task *indexHashJoinTask,
joinResult *indexHashJoinResult, h hash.Hash64, buf []byte) (bool, *indexHashJoinResult) {
matchedOuterRows, matchedOuterRowPtr, err := iw.getMatchedOuterRows(innerRow, task, h, buf)
if err != nil {
joinResult.err = err
return false, joinResult
}
if len(matchedOuterRows) == 0 {
return true, joinResult
}
var ok bool
cursor := 0
iw.rowIter.Reset(matchedOuterRows)
iter := iw.rowIter
for iw.rowIter.Begin(); iter.Current() != iter.End(); {
iw.outerRowStatus, err = iw.joiner.TryToMatchOuters(iter, innerRow, joinResult.chk, iw.outerRowStatus)
if err != nil {
joinResult.err = err
return false, joinResult
}
for _, status := range iw.outerRowStatus {
chkIdx, rowIdx := matchedOuterRowPtr[cursor].ChkIdx, matchedOuterRowPtr[cursor].RowIdx
if status == outerRowMatched || task.outerRowStatus[chkIdx][rowIdx] == outerRowUnmatched {
task.outerRowStatus[chkIdx][rowIdx] = status
}
cursor++
}
if joinResult.chk.IsFull() {
select {
case iw.resultCh <- joinResult:
case <-ctx.Done():
joinResult.err = ctx.Err()
return false, joinResult
}
failpoint.InjectCall("joinMatchedInnerRow2Chunk")
joinResult, ok = iw.getNewJoinResult(ctx)
if !ok {
return false, joinResult
}
}
}
return true, joinResult
}
func (iw *indexHashJoinInnerWorker) collectMatchedInnerPtrs4OuterRows(innerRow chunk.Row, innerRowPtr chunk.RowPtr,
task *indexHashJoinTask, h hash.Hash64, buf []byte) error {
_, matchedOuterRowIdx, err := iw.getMatchedOuterRows(innerRow, task, h, buf)
if err != nil {
return err
}
for _, outerRowPtr := range matchedOuterRowIdx {
chkIdx, rowIdx := outerRowPtr.ChkIdx, outerRowPtr.RowIdx
task.matchedInnerRowPtrs[chkIdx][rowIdx] = append(task.matchedInnerRowPtrs[chkIdx][rowIdx], innerRowPtr)
}
return nil
}
// doJoinInOrder follows the following steps:
// 1. collect all the matched inner row ptrs for every outer row
// 2. do the join work
// 2.1 collect all the matched inner rows using the collected ptrs for every outer row
// 2.2 call TryToMatchInners for every outer row
// 2.3 call OnMissMatch when no inner rows are matched
func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) {
defer func() {
if err == nil && joinResult.chk != nil {
if joinResult.chk.NumRows() > 0 {
select {
case resultCh <- joinResult:
case <-ctx.Done():
return
}
} else {
joinResult.src <- joinResult.chk
}
}
}()
for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ {
for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ {
row := chk.GetRow(j)
ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
err = iw.collectMatchedInnerPtrs4OuterRows(row, ptr, task, h, iw.joinKeyBuf)
failpoint.Inject("TestIssue31129", func() {
err = errors.New("TestIssue31129")
})
if err != nil {
return err
}
}
}
// TODO: matchedInnerRowPtrs and matchedInnerRows can be moved to inner worker.
matchedInnerRows := make([]chunk.Row, 0, len(task.matchedInnerRowPtrs))
var hasMatched, hasNull, ok bool
for chkIdx, innerRowPtrs4Chk := range task.matchedInnerRowPtrs {
for outerRowIdx, innerRowPtrs := range innerRowPtrs4Chk {
matchedInnerRows, hasMatched, hasNull = matchedInnerRows[:0], false, false
outerRow := task.outerResult.GetChunk(chkIdx).GetRow(outerRowIdx)
for _, ptr := range innerRowPtrs {
matchedInnerRows = append(matchedInnerRows, task.innerResult.GetRow(ptr))
}
iw.rowIter.Reset(matchedInnerRows)
iter := iw.rowIter
for iter.Begin(); iter.Current() != iter.End(); {
matched, isNull, err := iw.joiner.TryToMatchInners(outerRow, iter, joinResult.chk)
if err != nil {
return err
}
hasMatched, hasNull = matched || hasMatched, isNull || hasNull
if joinResult.chk.IsFull() {
select {
case resultCh <- joinResult:
case <-ctx.Done():
return ctx.Err()
}
joinResult, ok = iw.getNewJoinResult(ctx)
if !ok {
return errors.New("indexHashJoinInnerWorker.doJoinInOrder failed")
}
}
}
if !hasMatched {
iw.joiner.OnMissMatch(hasNull, outerRow, joinResult.chk)
}
}
}
return nil
}